diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..241741d6 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,24 @@ +name: ci + +on: + pull_request: + branches: + - "*" + +jobs: + test: + uses: signoz/primus.workflows/.github/workflows/go-test.yaml@main + secrets: inherit + with: + PRIMUS_REF: main + GO_TEST_CONTEXT: ./... + fmt: + uses: signoz/primus.workflows/.github/workflows/go-fmt.yaml@main + secrets: inherit + with: + PRIMUS_REF: main + lint: + uses: signoz/primus.workflows/.github/workflows/go-lint.yaml@main + secrets: inherit + with: + PRIMUS_REF: main \ No newline at end of file diff --git a/.github/workflows/codeql-analysis.yaml b/.github/workflows/codeql-analysis.yaml deleted file mode 100644 index ca71f3df..00000000 --- a/.github/workflows/codeql-analysis.yaml +++ /dev/null @@ -1,49 +0,0 @@ -name: "CodeQL Analysis" - -on: - push: - branches: - - main - pull_request: - branches: - - main - schedule: - - cron: "0 22 * * 6" - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - steps: - - name: Checkout repository - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: 1.21 - - - name: Initialize CodeQL - uses: github/codeql-action/init@v2 - with: - languages: go - - - name: Autobuild - uses: github/codeql-action/autobuild@v2 - - - name: Install cross-compilation tools - run: | - set -ex - sudo apt-get update - sudo apt-get install -y gcc-aarch64-linux-gnu musl-tools - - - name: Build Artifacts - run: make build-all - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/lint-and-test.yaml b/.github/workflows/lint-and-test.yaml deleted file mode 100644 index 0d3bcd89..00000000 --- a/.github/workflows/lint-and-test.yaml +++ /dev/null @@ -1,24 +0,0 @@ -name: test-pipeline - -on: - pull_request: - branches: - - "**" - -jobs: - lint-and-test: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.21 - - - name: Install tools - run: make install-ci - - - name: Run unit tests and lint - run: make test-and-lint diff --git a/exporter/clickhouselogsexporter/config.go b/exporter/clickhouselogsexporter/config.go index 4eb49f2d..33b3f0c1 100644 --- a/exporter/clickhouselogsexporter/config.go +++ b/exporter/clickhouselogsexporter/config.go @@ -34,6 +34,7 @@ type Config struct { DSN string `mapstructure:"dsn"` // Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false. DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"` + UseNewSchema bool `mapstructure:"use_new_schema" default:"false"` } var ( diff --git a/exporter/clickhouselogsexporter/config_test.go b/exporter/clickhouselogsexporter/config_test.go index 1b6ab332..7fcf3400 100644 --- a/exporter/clickhouselogsexporter/config_test.go +++ b/exporter/clickhouselogsexporter/config_test.go @@ -37,7 +37,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, len(cfg.Exporters), 2) + assert.Equal(t, len(cfg.Exporters), 3) defaultCfg := factory.CreateDefaultConfig() defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000/?dial_timeout=5s" @@ -64,4 +64,8 @@ func TestLoadConfig(t *testing.T) { QueueSize: 100, }, }) + + defaultCfg.(*Config).UseNewSchema = true + r2 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "new_schema")] + assert.Equal(t, r2, defaultCfg) } diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 8d8c989d..7775bbe2 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -27,6 +27,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" driver "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter/logsv2" "github.com/SigNoz/signoz-otel-collector/usage" "github.com/SigNoz/signoz-otel-collector/utils" "github.com/google/uuid" @@ -41,15 +42,18 @@ import ( ) const ( - DISTRIBUTED_LOGS_TABLE = "distributed_logs" - DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" + DISTRIBUTED_LOGS_TABLE = "distributed_logs" + DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" + DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2" + DISTRIBUTED_LOGS_RESOURCE_V2 = "distributed_logs_v2_resource" + DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS = 1800 ) type clickhouseLogsExporter struct { - id uuid.UUID - db clickhouse.Conn - insertLogsSQL string - ksuid ksuid.KSUID + id uuid.UUID + db clickhouse.Conn + insertLogsSQL string + insertLogsSQLV2 string logger *zap.Logger cfg *Config @@ -58,6 +62,8 @@ type clickhouseLogsExporter struct { wg *sync.WaitGroup closeChan chan struct{} + + useNewSchema bool } func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) { @@ -71,6 +77,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro } insertLogsSQL := renderInsertLogsSQL(cfg) + insertLogsSQLV2 := renderInsertLogsSQLV2(cfg) id := uuid.New() collector := usage.NewUsageCollector( id, @@ -93,15 +100,16 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro } return &clickhouseLogsExporter{ - id: id, - db: client, - insertLogsSQL: insertLogsSQL, - logger: logger, - cfg: cfg, - ksuid: ksuid.New(), - usageCollector: collector, - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), + id: id, + db: client, + insertLogsSQL: insertLogsSQL, + insertLogsSQLV2: insertLogsSQLV2, + logger: logger, + cfg: cfg, + usageCollector: collector, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + useNewSchema: cfg.UseNewSchema, }, nil } @@ -184,7 +192,7 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs) if err != nil { // StatementSend:code: 252, message: Too many partitions for single INSERT block // iterating twice since we want to try once after removing the old data - if i == 1 || !strings.Contains(err.Error(), "StatementSend:code: 252") { + if i == 1 || !strings.Contains(err.Error(), "code: 252") { // TODO(nitya): after returning it will be retried, ideally it should be pushed to DLQ return err } @@ -202,7 +210,15 @@ func (e *clickhouseLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs) return nil } +func tsBucket(ts int64, bucketSize int64) int64 { + return (int64(ts) / int64(bucketSize)) * int64(bucketSize) +} + func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.Logs) error { + resourcesSeen := map[int64]map[string]string{} + + var insertLogsStmtV2 driver.Batch + var insertResourcesStmtV2 driver.Batch var statement driver.Batch var tagStatement driver.Batch var err error @@ -214,6 +230,12 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L if tagStatement != nil { _ = tagStatement.Abort() } + if insertLogsStmtV2 != nil { + _ = insertLogsStmtV2.Abort() + } + if insertResourcesStmtV2 != nil { + _ = insertResourcesStmtV2.Abort() + } }() select { @@ -221,9 +243,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return errors.New("shutdown has been called") default: start := time.Now() - statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) - if err != nil { - return fmt.Errorf("PrepareBatch:%w", err) + chLen := 2 + if !e.useNewSchema { + chLen = 3 + statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareBatch:%w", err) + } } tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection()) @@ -231,6 +257,11 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return fmt.Errorf("PrepareTagBatch:%w", err) } + insertLogsStmtV2, err = e.db.PrepareBatch(ctx, e.insertLogsSQLV2, driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareBatchV2:%w", err) + } + metrics := map[string]usage.Metric{} for i := 0; i < ld.ResourceLogs().Len(); i++ { @@ -240,7 +271,15 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L resources := attributesToSlice(res.Attributes(), true) - err := addAttrsToTagStatement(tagStatement, "resource", resources) + resourcesMap := attributesToMap(res.Attributes(), true) + + serializedRes, err := json.Marshal(res.Attributes().AsRaw()) + if err != nil { + return fmt.Errorf("couldn't serialize log resource JSON: %w", err) + } + resourceJson := string(serializedRes) + + err = addAttrsToTagStatement(tagStatement, "resource", resources, e.useNewSchema) if err != nil { return err } @@ -254,8 +293,9 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeVersion := scope.Version() scopeAttributes := attributesToSlice(scope.Attributes(), true) + scopeMap := attributesToMap(scope.Attributes(), true) - err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes) + err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes, e.useNewSchema) if err != nil { return err } @@ -279,9 +319,27 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L ts = ots } + // generate the id from timestamp + id, err := ksuid.NewRandomWithTime(time.Unix(0, int64(ts))) + if err != nil { + return fmt.Errorf("IdGenError:%w", err) + } + + lBucketStart := tsBucket(int64(ts/1000000000), DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS) + + if _, exists := resourcesSeen[int64(lBucketStart)]; !exists { + resourcesSeen[int64(lBucketStart)] = map[string]string{} + } + fp, exists := resourcesSeen[int64(lBucketStart)][resourceJson] + if !exists { + fp = logsv2.CalculateFingerprint(res.Attributes().AsRaw(), logsv2.ResourceHierarchy()) + resourcesSeen[int64(lBucketStart)][resourceJson] = fp + } + attributes := attributesToSlice(r.Attributes(), false) + attrsMap := attributesToMap(r.Attributes(), false) - err := addAttrsToTagStatement(tagStatement, "tag", attributes) + err = addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema) if err != nil { return err } @@ -289,50 +347,116 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L // remove after sometime attributes = addTemporaryUnderscoreSupport(attributes) - err = statement.Append( + err = insertLogsStmtV2.Append( + uint64(lBucketStart), + fp, ts, ots, - e.ksuid.String(), + id.String(), utils.TraceIDToHexOrEmptyString(r.TraceID()), utils.SpanIDToHexOrEmptyString(r.SpanID()), uint32(r.Flags()), r.SeverityText(), uint8(r.SeverityNumber()), getStringifiedBody(r.Body()), - resources.StringKeys, - resources.StringValues, - attributes.StringKeys, - attributes.StringValues, - attributes.IntKeys, - attributes.IntValues, - attributes.FloatKeys, - attributes.FloatValues, - attributes.BoolKeys, - attributes.BoolValues, + attrsMap.StringData, + attrsMap.NumberData, + attrsMap.BoolData, + resourcesMap.StringData, scopeName, scopeVersion, - scopeAttributes.StringKeys, - scopeAttributes.StringValues, + scopeMap.StringData, ) + if err != nil { + return fmt.Errorf("StatementAppendLogsV2:%w", err) + } + + // old table + if !e.useNewSchema { + err = statement.Append( + ts, + ots, + id.String(), + utils.TraceIDToHexOrEmptyString(r.TraceID()), + utils.SpanIDToHexOrEmptyString(r.SpanID()), + uint32(r.Flags()), + r.SeverityText(), + uint8(r.SeverityNumber()), + getStringifiedBody(r.Body()), + resources.StringKeys, + resources.StringValues, + attributes.StringKeys, + attributes.StringValues, + attributes.IntKeys, + attributes.IntValues, + attributes.FloatKeys, + attributes.FloatValues, + attributes.BoolKeys, + attributes.BoolValues, + scopeName, + scopeVersion, + scopeAttributes.StringKeys, + scopeAttributes.StringValues, + ) + } if err != nil { return fmt.Errorf("StatementAppend:%w", err) } - e.ksuid = e.ksuid.Next() } } } - dbWriteStart := time.Now() - err = statement.Send() - stats.RecordWithTags(ctx, - []tag.Mutator{ - tag.Upsert(exporterKey, component.DataTypeLogs.String()), - tag.Upsert(tableKey, DISTRIBUTED_LOGS_TABLE), - }, - writeLatencyMillis.M(int64(time.Since(dbWriteStart).Milliseconds())), + + insertResourcesStmtV2, err = e.db.PrepareBatch( + ctx, + fmt.Sprintf("INSERT into %s.%s", databaseName, DISTRIBUTED_LOGS_RESOURCE_V2), + driver.WithReleaseConnection(), ) if err != nil { - return fmt.Errorf("StatementSend:%w", err) + return fmt.Errorf("couldn't PrepareBatch for inserting resource fingerprints :%w", err) + } + + for bucketTs, resources := range resourcesSeen { + for resourceLabels, fingerprint := range resources { + insertResourcesStmtV2.Append( + resourceLabels, + fingerprint, + bucketTs, + ) + } + } + + var wg sync.WaitGroup + chErr := make(chan error, chLen) + chDuration := make(chan statementSendDuration, chLen) + + wg.Add(chLen) + if !e.useNewSchema { + go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg) } + go send(insertLogsStmtV2, DISTRIBUTED_LOGS_TABLE_V2, chDuration, chErr, &wg) + go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg) + wg.Wait() + close(chErr) + + // store the duration for send the data + for i := 0; i < chLen; i++ { + sendDuration := <-chDuration + stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(exporterKey, component.DataTypeLogs.String()), + tag.Upsert(tableKey, sendDuration.Name), + }, + writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())), + ) + } + + // check the errors + for i := 0; i < chLen; i++ { + if r := <-chErr; r != nil { + return fmt.Errorf("StatementSend:%w", r) + } + } + duration := time.Since(start) e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()), zap.String("cost", duration.String())) @@ -359,6 +483,22 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } } +type statementSendDuration struct { + Name string + duration time.Duration +} + +func send(statement driver.Batch, tableName string, durationCh chan<- statementSendDuration, chErr chan<- error, wg *sync.WaitGroup) { + defer wg.Done() + start := time.Now() + err := statement.Send() + chErr <- err + durationCh <- statementSendDuration{ + Name: tableName, + duration: time.Since(start), + } +} + type attributesToSliceResponse struct { StringKeys []string StringValues []string @@ -381,7 +521,7 @@ func getStringifiedBody(body pcommon.Value) string { return strBody } -func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse) error { +func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse, useNewSchema bool) error { for i, v := range attrs.StringKeys { err := statement.Append( time.Now(), @@ -396,15 +536,20 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib return fmt.Errorf("could not append string attribute to batch, err: %s", err) } } + + intTypeName := "int64" + if useNewSchema { + intTypeName = "float64" + } for i, v := range attrs.IntKeys { err := statement.Append( time.Now(), v, tagType, - "int64", + intTypeName, nil, - attrs.IntValues[i], nil, + attrs.IntValues[i], ) if err != nil { return fmt.Errorf("could not append number attribute to batch, err: %s", err) @@ -441,9 +586,40 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib return nil } +type attributeMap struct { + StringData map[string]string + NumberData map[string]float64 + BoolData map[string]bool +} + +func attributesToMap(attributes pcommon.Map, forceStringValues bool) (response attributeMap) { + response.BoolData = map[string]bool{} + response.StringData = map[string]string{} + response.NumberData = map[string]float64{} + attributes.Range(func(k string, v pcommon.Value) bool { + if forceStringValues { + // store everything as string + response.StringData[k] = v.AsString() + } else { + switch v.Type() { + case pcommon.ValueTypeInt: + response.NumberData[k] = float64(v.Int()) + case pcommon.ValueTypeDouble: + response.NumberData[k] = v.Double() + case pcommon.ValueTypeBool: + response.BoolData[k] = v.Bool() + default: // store it as string + response.StringData[k] = v.AsString() + } + } + + return true + }) + return response +} + func attributesToSlice(attributes pcommon.Map, forceStringValues bool) (response attributesToSliceResponse) { attributes.Range(func(k string, v pcommon.Value) bool { - // Support for . , remove the above section once done. if forceStringValues { // store everything as string response.StringKeys = append(response.StringKeys, k) @@ -519,7 +695,7 @@ const ( body, resources_string_key, resources_string_value, - attributes_string_key, + attributes_string_key, attributes_string_value, attributes_int64_key, attributes_int64_value, @@ -558,6 +734,49 @@ const ( )` ) +const ( + // language=ClickHouse SQL + insertLogsSQLTemplateV2 = `INSERT INTO %s.%s ( + ts_bucket_start, + resource_fingerprint, + timestamp, + observed_timestamp, + id, + trace_id, + span_id, + trace_flags, + severity_text, + severity_number, + body, + attributes_string, + attributes_number, + attributes_bool, + resources_string, + scope_name, + scope_version, + scope_string + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + )` +) + // newClickhouseClient create a clickhouse client. func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, error) { // use empty database to create database @@ -588,3 +807,7 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro func renderInsertLogsSQL(cfg *Config) string { return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE) } + +func renderInsertLogsSQLV2(cfg *Config) string { + return fmt.Sprintf(insertLogsSQLTemplateV2, databaseName, DISTRIBUTED_LOGS_TABLE_V2) +} diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint.go b/exporter/clickhouselogsexporter/logsv2/fingerprint.go new file mode 100644 index 00000000..9bb55666 --- /dev/null +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint.go @@ -0,0 +1,201 @@ +package logsv2 + +import ( + "fmt" + "strings" +) + +type DimensionHierarchyNode struct { + // labels that map to this node in dimension hierarchy + labels []string + + // List of potential subhierachies in the order of preference. + // Eg: k8s.cluster.name can have subhierarchies of k8s.namespace.name or k8s.node.name - the 2 ways of grouping/organizing k8s resources. + // In most cases this list will have only one entry + subHierachies []DimensionHierarchyNode +} + +type IdLabelValue struct { + Label string + Value any +} + +// Returns list of dimension labels for a set of attributes for a DimensionHierarchy +func (node *DimensionHierarchyNode) Identifier(attributes map[string]any) []IdLabelValue { + result := []IdLabelValue{} + + for _, l := range node.labels { + if lVal, exists := attributes[l]; exists { + result = append(result, IdLabelValue{ + Label: l, + Value: lVal, + }) + break + } + } + + for _, s := range node.subHierachies { + subLabels := s.Identifier(attributes) + if len(subLabels) > 0 { + result = append(result, subLabels...) + break + } + } + + return result +} + +// TODO(Raj/Nitya): Consider parsing this stuff out from json +func ResourceHierarchy() *DimensionHierarchyNode { + return &DimensionHierarchyNode{ + labels: []string{ + "cloud.provider", + }, + subHierachies: []DimensionHierarchyNode{{ + labels: []string{"cloud.account.id"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{"gcp.project"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "cloud.region", + "aws.region", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "cloud.platform", + "source_type", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.cluster.name", + "k8s.cluster.uid", + "aws.ecs.cluster.arn", + }, + + subHierachies: []DimensionHierarchyNode{ + // Logical/service oriented view + { + labels: []string{ + "service.namespace", + "k8s.namespace.name", + "ec2.tag.service-group", // is this standard enough? + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "service.name", + "cloudwatch.log.group.name", + "k8s.deployment.name", + "k8s.deployment.uid", + "k8s.statefulset.name", + "k8s.statefulset.uid", + "k8s.daemonset.name", + "k8s.daemonset.uid", + "k8s.job.name", + "k8s.job.uid", + "k8s.cronjob.name", + "k8s.cronjob.uid", + "faas.name", + "ec2.tag.service", // is this standard enough? + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "deployment.environment", + "ec2.tag.env-short", // is this standard enough? + "ec2.tag.env", // is this standard enough? + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "service.instance.id", + "k8s.pod.name", + "k8s.pod.uid", + "aws.ecs.task.id", + "aws.ecs.task.arn", + "cloudwatch.log.stream", + "cloud.resource_id", + "faas.instance", + "host.id", + "host.name", + "host.ip", + "host", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.container.name", + "container.name", + "container_name", + }, + }}, + }}, + }}, + }}, + }, + + // Node oriented view + { + labels: []string{"cloud.availability_zone"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.node.name", + "k8s.node.uid", + "host.id", + "host.name", + "host.ip", + "host", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.pod.name", + "k8s.pod.uid", + "aws.ecs.task.id", + "aws.ecs.task.arn", + }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.container.name", + "container.name", + }, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + }}, + } +} + +// Calculates fingerprint for attributes that when sorted would keep fingerprints +// for the same set of attributes next to each other while also colocating +// entries at all levels of the hierarchy +// For example, fingerprints like "k8s.deployment.name=webserver;k8s.pod.name=webserver-0" +// will calculate logs for each webserver pod while also calculate all logs for the webserver deployment +func CalculateFingerprint( + attributes map[string]any, hierarchy *DimensionHierarchyNode, +) string { + id := hierarchy.Identifier(attributes) + + fingerprintParts := []string{} + for _, idLabel := range id { + fingerprintParts = append( + fingerprintParts, fmt.Sprintf("%s=%s", idLabel.Label, idLabel.Value), + ) + } + + hash := FingerprintHash(attributes) + fingerprintParts = append(fingerprintParts, fmt.Sprintf("%s=%v", "hash", hash)) + + return strings.Join(fingerprintParts, ";") +} diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go b/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go new file mode 100644 index 00000000..65f55086 --- /dev/null +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go @@ -0,0 +1,41 @@ +package logsv2 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculateFingerprint(t *testing.T) { + testCases := []struct { + Name string + ResourceAttrs map[string]any + FingerPrint string + }{ + { + Name: "Random resource attr", + ResourceAttrs: map[string]any{"a": "b"}, + FingerPrint: "hash=15182603570120227210", + }, + { + Name: "Few attrs from the hierarchy", + ResourceAttrs: map[string]any{"ec2.tag.env": "fn-prod", "host.image.id": "ami-fce3c696"}, + FingerPrint: "ec2.tag.env=fn-prod;hash=5580615729524003981", + }, + { + Name: "More than one attrs from the hierarchy", + ResourceAttrs: map[string]any{"cloudwatch.log.stream": "mystr", "ec2.tag.env": "fn-prod", "host.image.id": "ami-fce3c696"}, + FingerPrint: "ec2.tag.env=fn-prod;cloudwatch.log.stream=mystr;hash=10649409385811604510", + }, + { + Name: "Vector and gcp", + ResourceAttrs: map[string]any{"gcp.project": "myproject", "source_type": "gcp", "random_key": "val"}, + FingerPrint: "gcp.project=myproject;source_type=gcp;hash=11162778839006855273", + }, + } + + for _, ts := range testCases { + res := CalculateFingerprint(ts.ResourceAttrs, ResourceHierarchy()) + assert.Equal(t, ts.FingerPrint, res) + } +} diff --git a/exporter/clickhouselogsexporter/logsv2/hash.go b/exporter/clickhouselogsexporter/logsv2/hash.go new file mode 100644 index 00000000..6592e473 --- /dev/null +++ b/exporter/clickhouselogsexporter/logsv2/hash.go @@ -0,0 +1,52 @@ +package logsv2 + +import ( + "fmt" + "sort" +) + +const ( + offset64 uint64 = 14695981039346656037 + prime64 uint64 = 1099511628211 + separatorByte byte = 255 +) + +// hashAdd adds a string to a fnv64a hash value, returning the updated hash. +func hashAdd(h uint64, s string) uint64 { + for i := 0; i < len(s); i++ { + h ^= uint64(s[i]) + h *= prime64 + } + return h +} + +// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash. +func hashAddByte(h uint64, b byte) uint64 { + h ^= uint64(b) + h *= prime64 + return h +} + +// FingerprintHash calculates a fingerprint of SORTED BY NAME labels. +// It is adopted from labelSetToFingerprint, but avoids type conversions and memory allocations. +func FingerprintHash(attribs map[string]any) uint64 { + if len(attribs) == 0 { + return offset64 + } + + keys := []string{} + for k, _ := range attribs { + keys = append(keys, k) + } + + sort.Strings(keys) + + sum := offset64 + for _, k := range keys { + sum = hashAdd(sum, k) + sum = hashAddByte(sum, separatorByte) + sum = hashAdd(sum, fmt.Sprintf("%v", attribs[k])) + sum = hashAddByte(sum, separatorByte) + } + return sum +} diff --git a/exporter/clickhouselogsexporter/testdata/config.yaml b/exporter/clickhouselogsexporter/testdata/config.yaml index f209f571..21313296 100644 --- a/exporter/clickhouselogsexporter/testdata/config.yaml +++ b/exporter/clickhouselogsexporter/testdata/config.yaml @@ -19,6 +19,9 @@ exporters: multiplier: 1.3 sending_queue: queue_size: 100 + clickhouselogsexporter/new_schema: + dsn: tcp://127.0.0.1:9000/?dial_timeout=5s + use_new_schema: true service: pipelines: diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 21407e2d..9b58300a 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -75,6 +75,7 @@ type clickHouse struct { prevShardCount uint64 watcherInterval time.Duration writeTSToV4 bool + disableV2 bool mWrittenTimeSeries prometheus.Counter @@ -89,6 +90,7 @@ type ClickHouseParams struct { MaxTimeSeriesInQuery int WatcherInterval time.Duration WriteTSToV4 bool + DisableV2 bool ExporterId uuid.UUID } @@ -139,6 +141,7 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { }), watcherInterval: params.WatcherInterval, writeTSToV4: params.WriteTSToV4, + disableV2: params.DisableV2, exporterID: params.ExporterId, } @@ -267,6 +270,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr ch.timeSeriesRW.Unlock() err := func() error { + if ch.disableV2 { + return nil + } statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE), driver.WithReleaseConnection()) if err != nil { return err @@ -306,6 +312,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr } err = func() error { + if ch.disableV2 { + return nil + } ctx := context.Background() statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", ch.database, DISTRIBUTED_SAMPLES_TABLE), driver.WithReleaseConnection()) diff --git a/exporter/clickhousemetricsexporter/config.go b/exporter/clickhousemetricsexporter/config.go index 736e35aa..d362ed6a 100644 --- a/exporter/clickhousemetricsexporter/config.go +++ b/exporter/clickhousemetricsexporter/config.go @@ -49,6 +49,7 @@ type Config struct { WatcherInterval time.Duration `mapstructure:"watcher_interval"` WriteTSToV4 bool `mapstructure:"write_ts_to_v4"` + DisableV2 bool `mapstructure:"disable_v2"` } // RemoteWriteQueue allows to configure the remote write queue. diff --git a/exporter/clickhousemetricsexporter/config_test.go b/exporter/clickhousemetricsexporter/config_test.go index 29f0a6ee..c6c562ca 100644 --- a/exporter/clickhousemetricsexporter/config_test.go +++ b/exporter/clickhousemetricsexporter/config_test.go @@ -87,6 +87,7 @@ func Test_loadConfig(t *testing.T) { ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, WatcherInterval: 30 * time.Second, WriteTSToV4: true, + DisableV2: false, }) } diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index 2b25ef1e..43263b5a 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -92,6 +92,7 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err MaxTimeSeriesInQuery: 50, WatcherInterval: cfg.WatcherInterval, WriteTSToV4: cfg.WriteTSToV4, + DisableV2: cfg.DisableV2, ExporterId: id, } ch, err := NewClickHouse(params) diff --git a/exporter/clickhousemetricsexporter/factory.go b/exporter/clickhousemetricsexporter/factory.go index eda2e0bb..67a418bb 100644 --- a/exporter/clickhousemetricsexporter/factory.go +++ b/exporter/clickhousemetricsexporter/factory.go @@ -126,5 +126,6 @@ func createDefaultConfig() component.Config { }, WatcherInterval: 30 * time.Second, WriteTSToV4: true, + DisableV2: false, } } diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 8b795314..8419249c 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -206,9 +206,9 @@ func populateOtherDimensions(attributes pcommon.Map, span *Span) { span.MsgOperation = v.Str() } else if k == "db.system" { span.DBSystem = v.Str() - } else if k == "db.name" { + } else if k == "db.name" || k == "db.namespace" { span.DBName = v.Str() - } else if k == "db.operation" { + } else if k == "db.operation" || k == "db.operation.name" { span.DBOperation = v.Str() } else if k == "peer.service" { span.PeerService = v.Str() diff --git a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql index 2e908e57..3f65f18b 100644 --- a/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql +++ b/migrationmanager/migrators/logs/migrations/000011_add_instrumentation_scope.up.sql @@ -11,7 +11,6 @@ -- ALTER TABLE signoz_logs.distributed_logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD column IF NOT EXISTS instrumentation_scope_attributes_string_value Array(String) CODEC(ZSTD(1)) -- ALTER TABLE signoz_logs.logs ON CLUSTER {{.SIGNOZ_CLUSTER}} ADD INDEX IF NOT EXISTS instrumentation_scope_idx (instrumentation_scope) TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4 - -- Don't run the commands below ALTER TABLE signoz_logs.tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource', 'scope') CODEC(ZSTD(1)); ALTER TABLE signoz_logs.distributed_tag_attributes ON CLUSTER {{.SIGNOZ_CLUSTER}} modify column tagType Enum8('tag', 'resource', 'scope') CODEC(ZSTD(1)); diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql new file mode 100644 index 00000000..6f6531de --- /dev/null +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.down.sql @@ -0,0 +1,39 @@ +DROP TABLE IF EXISTS signoz_logs.resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_string_key) as name, 'String' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_int64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_int64_key) as name, 'Int64' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_float64_key) as name, 'Float64' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(attributes_bool_key) as name, 'Bool' datatype +FROM signoz_logs.logs +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_resource_keys AS +SELECT +distinct arrayJoin(resources_string_key) as name, 'String' datatype +FROM signoz_logs.logs +ORDER BY name; + +DROP TABLE IF EXISTS signoz_logs.logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2_resource_bucket ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}; diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql new file mode 100644 index 00000000..87597c1b --- /dev/null +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql @@ -0,0 +1,117 @@ +CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `labels` String CODEC(ZSTD(5)), + `fingerprint` String CODEC(ZSTD(1)), + `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)), + INDEX idx_labels lower(labels) TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1 +) +ENGINE = ReplicatedReplacingMergeTree +PARTITION BY toDate(seen_at_ts_bucket_start / 1000) +ORDER BY (labels, fingerprint, seen_at_ts_bucket_start) +TTL toDateTime(seen_at_ts_bucket_start) + INTERVAL 1296000 SECOND + INTERVAL 1800 SECOND DELETE +SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192; + + +CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2_resource ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `labels` String CODEC(ZSTD(5)), + `fingerprint` String CODEC(ZSTD(1)), + `seen_at_ts_bucket_start` Int64 CODEC(Delta(8), ZSTD(1)) +) +ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2_resource', cityHash64(labels, fingerprint)); + + +CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `ts_bucket_start` UInt64 CODEC(DoubleDelta, LZ4), + `resource_fingerprint` String CODEC(ZSTD(1)), + `timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `observed_timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `id` String CODEC(ZSTD(1)), + `trace_id` String CODEC(ZSTD(1)), + `span_id` String CODEC(ZSTD(1)), + `trace_flags` UInt32, + `severity_text` LowCardinality(String) CODEC(ZSTD(1)), + `severity_number` UInt8, + `body` String CODEC(ZSTD(2)), + `attributes_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `attributes_number` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)), + `attributes_bool` Map(LowCardinality(String), Bool) CODEC(ZSTD(1)), + `resources_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `scope_name` String CODEC(ZSTD(1)), + `scope_version` String CODEC(ZSTD(1)), + `scope_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + INDEX body_idx lower(body) TYPE ngrambf_v1(4, 60000, 5, 0) GRANULARITY 1, + INDEX id_minmax id TYPE minmax GRANULARITY 1, + INDEX severity_number_idx severity_number TYPE set(25) GRANULARITY 4, + INDEX severity_text_idx severity_text TYPE set(25) GRANULARITY 4, + INDEX trace_flags_idx trace_flags TYPE bloom_filter GRANULARITY 4, + INDEX scope_name_idx scope_name TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4, + INDEX attributes_string_idx_key mapKeys(attributes_string) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1, + INDEX attributes_string_idx_val mapValues(attributes_string) TYPE ngrambf_v1(4, 5000, 2, 0) GRANULARITY 1, + INDEX attributes_int64_idx_key mapKeys(attributes_number) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1, + INDEX attributes_int64_idx_val mapValues(attributes_number) TYPE bloom_filter GRANULARITY 1, + INDEX attributes_bool_idx_key mapKeys(attributes_bool) TYPE tokenbf_v1(1024, 2, 0) GRANULARITY 1 +) +ENGINE = ReplicatedMergeTree +PARTITION BY toDate(timestamp / 1000000000) +ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id) +TTL toDateTime(timestamp / 1000000000) + INTERVAL 1296000 SECOND DELETE +SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192; + + + +CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `ts_bucket_start` UInt64 CODEC(DoubleDelta, LZ4), + `resource_fingerprint` String CODEC(ZSTD(1)), + `timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `observed_timestamp` UInt64 CODEC(DoubleDelta, LZ4), + `id` String CODEC(ZSTD(1)), + `trace_id` String CODEC(ZSTD(1)), + `span_id` String CODEC(ZSTD(1)), + `trace_flags` UInt32, + `severity_text` LowCardinality(String) CODEC(ZSTD(1)), + `severity_number` UInt8, + `body` String CODEC(ZSTD(2)), + `attributes_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `attributes_number` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)), + `attributes_bool` Map(LowCardinality(String), Bool) CODEC(ZSTD(1)), + `resources_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `scope_name` String CODEC(ZSTD(1)), + `scope_version` String CODEC(ZSTD(1)), + `scope_string` Map(LowCardinality(String), String) CODEC(ZSTD(1)) +) +ENGINE = Distributed({{.SIGNOZ_CLUSTER}}, 'signoz_logs', 'logs_v2', cityHash64(id)); + +-- remove the old mv +DROP TABLE IF EXISTS signoz_logs.resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_int64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; + + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(mapKeys(attributes_string)) as name, 'String' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS attribute_keys_float64_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(mapKeys(attributes_number)) as name, 'Float64' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_logs.attribute_keys_bool_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_attribute_keys AS +SELECT +distinct arrayJoin(mapKeys(attributes_bool)) as name, 'Bool' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; + +CREATE MATERIALIZED VIEW IF NOT EXISTS resource_keys_string_final_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} TO signoz_logs.logs_resource_keys AS +SELECT +distinct arrayJoin(mapKeys(resources_string)) as name, 'String' datatype +FROM signoz_logs.logs_v2 +ORDER BY name; \ No newline at end of file