diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 15122c3f..ef55c529 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -288,6 +288,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeVersion := scope.Version() scopeAttributes := attributesToSlice(scope.Attributes(), true) + scopeMap := attributesToMap(scope.Attributes(), true) err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes) if err != nil { @@ -353,8 +354,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L resourcesMap.StringData, scopeName, scopeVersion, - scopeAttributes.StringKeys, - scopeAttributes.StringValues, + scopeMap.StringData, ) if err != nil { return fmt.Errorf("StatementAppendLogsV2:%w", err) @@ -413,13 +413,12 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } } - var errResource error dbWriteStart := time.Now() err = statement.Send() // insert to the new table errV2 := insertLogsStmtV2.Send() // insert into the resource bucket table - errResource = insertResourcesStmtV2.Send() + errResource := insertResourcesStmtV2.Send() if err != nil { return fmt.Errorf("couldn't send batch insert resources statement:%w", err) } @@ -560,7 +559,6 @@ func attributesToMap(attributes pcommon.Map, forceStringValues bool) (response a response.StringData = map[string]string{} response.NumberData = map[string]float64{} attributes.Range(func(k string, v pcommon.Value) bool { - // Support for . , remove the above section once done. if forceStringValues { // store everything as string response.StringData[k] = v.AsString() @@ -584,7 +582,6 @@ func attributesToMap(attributes pcommon.Map, forceStringValues bool) (response a func attributesToSlice(attributes pcommon.Map, forceStringValues bool) (response attributesToSliceResponse) { attributes.Range(func(k string, v pcommon.Value) bool { - // Support for . , remove the above section once done. if forceStringValues { // store everything as string response.StringKeys = append(response.StringKeys, k) @@ -719,8 +716,7 @@ const ( resources_string, scope_name, scope_version, - scope_string_key, - scope_string_value + scope_string ) VALUES ( ?, ?, @@ -739,7 +735,6 @@ const ( ?, ?, ?, - ?, ? )` ) diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint.go b/exporter/clickhouselogsexporter/logsv2/fingerprint.go index 79c9a4de..dfe86f73 100644 --- a/exporter/clickhouselogsexporter/logsv2/fingerprint.go +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint.go @@ -7,7 +7,6 @@ import ( type DimensionHierarchyNode struct { // labels that map to this node in dimension hierarchy - // TODO(Raj): Can labels be a concat of 2 keys? labels []string // List of potential subhierachies in the order of preference. @@ -57,115 +56,118 @@ func ResourceHierarchy() *DimensionHierarchyNode { labels: []string{"cloud.account.id"}, subHierachies: []DimensionHierarchyNode{{ - labels: []string{ - "cloud.region", - "aws.region", - }, + labels: []string{"gcp.project"}, subHierachies: []DimensionHierarchyNode{{ - labels: []string{"cloud.platform"}, + labels: []string{ + "cloud.region", + "aws.region", + }, subHierachies: []DimensionHierarchyNode{{ - labels: []string{ - "k8s.cluster.name", - "k8s.cluster.uid", - "aws.ecs.cluster.arn", - "gcp.project", - }, - - subHierachies: []DimensionHierarchyNode{ - // Logical/service oriented view - { - labels: []string{ - "service.namespace", - "k8s.namespace.name", - "ec2.tag.service-group", // is this standard enough? - }, + labels: []string{"cloud.platform"}, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.cluster.name", + "k8s.cluster.uid", + "aws.ecs.cluster.arn", + }, - subHierachies: []DimensionHierarchyNode{{ + subHierachies: []DimensionHierarchyNode{ + // Logical/service oriented view + { labels: []string{ - "service.name", - "cloudwatch.log.group.name", - "k8s.deployment.name", - "k8s.deployment.uid", - "k8s.statefulset.name", - "k8s.statefulset.uid", - "k8s.daemonset.name", - "k8s.daemonset.uid", - "k8s.job.name", - "k8s.job.uid", - "k8s.cronjob.name", - "k8s.cronjob.uid", - "faas.name", - "ec2.tag.service", // is this standard enough? + "service.namespace", + "k8s.namespace.name", + "ec2.tag.service-group", // is this standard enough? }, subHierachies: []DimensionHierarchyNode{{ labels: []string{ - "deployment.environment", - "ec2.tag.env-short", // is this standard enough? - "ec2.tag.env", // is this standard enough? + "service.name", + "cloudwatch.log.group.name", + "k8s.deployment.name", + "k8s.deployment.uid", + "k8s.statefulset.name", + "k8s.statefulset.uid", + "k8s.daemonset.name", + "k8s.daemonset.uid", + "k8s.job.name", + "k8s.job.uid", + "k8s.cronjob.name", + "k8s.cronjob.uid", + "faas.name", + "ec2.tag.service", // is this standard enough? }, subHierachies: []DimensionHierarchyNode{{ labels: []string{ - "service.instance.id", - "k8s.pod.name", - "k8s.pod.uid", - "aws.ecs.task.id", - "aws.ecs.task.arn", - "cloudwatch.log.stream", - "cloud.resource_id", - "faas.instance", - "host.id", - "host.name", - "host.ip", - "host", + "deployment.environment", + "ec2.tag.env-short", // is this standard enough? + "ec2.tag.env", // is this standard enough? }, subHierachies: []DimensionHierarchyNode{{ labels: []string{ - "k8s.container.name", - "container.name", - "container_name", + "service.instance.id", + "k8s.pod.name", + "k8s.pod.uid", + "aws.ecs.task.id", + "aws.ecs.task.arn", + "cloudwatch.log.stream", + "cloud.resource_id", + "faas.instance", + "host.id", + "host.name", + "host.ip", + "host", }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.container.name", + "container.name", + "container_name", + }, + }}, }}, }}, }}, - }}, - }, + }, - // Node oriented view - { - labels: []string{"cloud.availability_zone"}, - - subHierachies: []DimensionHierarchyNode{{ - labels: []string{ - "k8s.node.name", - "k8s.node.uid", - "host.id", - "host.name", - "host.ip", - "host", - }, + // Node oriented view + { + labels: []string{"cloud.availability_zone"}, subHierachies: []DimensionHierarchyNode{{ labels: []string{ - "k8s.pod.name", - "k8s.pod.uid", - "aws.ecs.task.id", - "aws.ecs.task.arn", + "k8s.node.name", + "k8s.node.uid", + "host.id", + "host.name", + "host.ip", + "host", }, subHierachies: []DimensionHierarchyNode{{ labels: []string{ - "k8s.container.name", - "container.name", + "k8s.pod.name", + "k8s.pod.uid", + "aws.ecs.task.id", + "aws.ecs.task.arn", }, + + subHierachies: []DimensionHierarchyNode{{ + labels: []string{ + "k8s.container.name", + "container.name", + }, + }}, }}, }}, }}, - }}, + }}, }}, }}, }}, @@ -176,6 +178,8 @@ func ResourceHierarchy() *DimensionHierarchyNode { // Calculates fingerprint for attributes that when sorted would keep fingerprints // for the same set of attributes next to each other while also colocating // entries at all levels of the hierarchy +// For example, fingerprints like "k8s.deployment.name=webserver;k8s.pod.name=webserver-0" +// will calculate logs for each webserver pod while also calculate all logs for the webserver deployment func CalculateFingerprint( attributes map[string]any, hierarchy *DimensionHierarchyNode, ) string { diff --git a/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go b/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go index 3cc14e0c..5a61ea07 100644 --- a/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go +++ b/exporter/clickhouselogsexporter/logsv2/fingerprint_test.go @@ -29,7 +29,7 @@ func TestCalculateFingerprint(t *testing.T) { }, { Name: "Vector and gcp", - ResourceAttrs: map[string]any{"gcp.project": "myproject", "source": "gcp", "random_key": "val"}, + ResourceAttrs: map[string]any{"gcp.project": "myproject", "source_type": "gcp", "random_key": "val"}, FingerPrint: "source_type=gcp;gcp.project=myproject;hash=11162778839006855273", }, } diff --git a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql index f1096410..e8f60399 100644 --- a/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql +++ b/migrationmanager/migrators/logs/migrations/000014_new_schema.up.sql @@ -39,8 +39,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}} `resources_string` Map(String, String) CODEC(ZSTD(1)), `scope_name` String CODEC(ZSTD(1)), `scope_version` String CODEC(ZSTD(1)), - `scope_string_key` Array(String) CODEC(ZSTD(1)), - `scope_string_value` Array(String) CODEC(ZSTD(1)), + `scope_string` Map(String, String) CODEC(ZSTD(1)), INDEX body_idx lower(body) TYPE ngrambf_v1(4, 60000, 5, 0) GRANULARITY 1, INDEX id_minmax id TYPE minmax GRANULARITY 1, INDEX severity_number_idx severity_number TYPE set(25) GRANULARITY 4, @@ -79,8 +78,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ `resources_string` Map(String, String) CODEC(ZSTD(1)), `scope_name` String CODEC(ZSTD(1)), `scope_version` String CODEC(ZSTD(1)), - `scope_string_key` Array(String) CODEC(ZSTD(1)), - `scope_string_value` Array(String) CODEC(ZSTD(1)) + `scope_string` Map(String, String) CODEC(ZSTD(1)) ) ENGINE = Distributed('cluster', 'signoz_logs', 'logs_v2', cityHash64(id));