Skip to content

Commit

Permalink
feat: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Aug 14, 2024
1 parent ebd3dc2 commit 736c441
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 90 deletions.
13 changes: 4 additions & 9 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
scopeVersion := scope.Version()

scopeAttributes := attributesToSlice(scope.Attributes(), true)
scopeMap := attributesToMap(scope.Attributes(), true)

err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes)
if err != nil {
Expand Down Expand Up @@ -353,8 +354,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
resourcesMap.StringData,
scopeName,
scopeVersion,
scopeAttributes.StringKeys,
scopeAttributes.StringValues,
scopeMap.StringData,
)
if err != nil {
return fmt.Errorf("StatementAppendLogsV2:%w", err)
Expand Down Expand Up @@ -413,13 +413,12 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
}

var errResource error
dbWriteStart := time.Now()
err = statement.Send()
// insert to the new table
errV2 := insertLogsStmtV2.Send()
// insert into the resource bucket table
errResource = insertResourcesStmtV2.Send()
errResource := insertResourcesStmtV2.Send()
if err != nil {
return fmt.Errorf("couldn't send batch insert resources statement:%w", err)
}
Expand Down Expand Up @@ -560,7 +559,6 @@ func attributesToMap(attributes pcommon.Map, forceStringValues bool) (response a
response.StringData = map[string]string{}
response.NumberData = map[string]float64{}
attributes.Range(func(k string, v pcommon.Value) bool {
// Support for . , remove the above section once done.
if forceStringValues {
// store everything as string
response.StringData[k] = v.AsString()
Expand All @@ -584,7 +582,6 @@ func attributesToMap(attributes pcommon.Map, forceStringValues bool) (response a

func attributesToSlice(attributes pcommon.Map, forceStringValues bool) (response attributesToSliceResponse) {
attributes.Range(func(k string, v pcommon.Value) bool {
// Support for . , remove the above section once done.
if forceStringValues {
// store everything as string
response.StringKeys = append(response.StringKeys, k)
Expand Down Expand Up @@ -719,8 +716,7 @@ const (
resources_string,
scope_name,
scope_version,
scope_string_key,
scope_string_value
scope_string
) VALUES (
?,
?,
Expand All @@ -739,7 +735,6 @@ const (
?,
?,
?,
?,
?
)`
)
Expand Down
156 changes: 80 additions & 76 deletions exporter/clickhouselogsexporter/logsv2/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

type DimensionHierarchyNode struct {
// labels that map to this node in dimension hierarchy
// TODO(Raj): Can labels be a concat of 2 keys?
labels []string

// List of potential subhierachies in the order of preference.
Expand Down Expand Up @@ -57,115 +56,118 @@ func ResourceHierarchy() *DimensionHierarchyNode {
labels: []string{"cloud.account.id"},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"cloud.region",
"aws.region",
},
labels: []string{"gcp.project"},

subHierachies: []DimensionHierarchyNode{{
labels: []string{"cloud.platform"},
labels: []string{
"cloud.region",
"aws.region",
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.cluster.name",
"k8s.cluster.uid",
"aws.ecs.cluster.arn",
"gcp.project",
},

subHierachies: []DimensionHierarchyNode{
// Logical/service oriented view
{
labels: []string{
"service.namespace",
"k8s.namespace.name",
"ec2.tag.service-group", // is this standard enough?
},
labels: []string{"cloud.platform"},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.cluster.name",
"k8s.cluster.uid",
"aws.ecs.cluster.arn",
},

subHierachies: []DimensionHierarchyNode{{
subHierachies: []DimensionHierarchyNode{
// Logical/service oriented view
{
labels: []string{
"service.name",
"cloudwatch.log.group.name",
"k8s.deployment.name",
"k8s.deployment.uid",
"k8s.statefulset.name",
"k8s.statefulset.uid",
"k8s.daemonset.name",
"k8s.daemonset.uid",
"k8s.job.name",
"k8s.job.uid",
"k8s.cronjob.name",
"k8s.cronjob.uid",
"faas.name",
"ec2.tag.service", // is this standard enough?
"service.namespace",
"k8s.namespace.name",
"ec2.tag.service-group", // is this standard enough?
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"deployment.environment",
"ec2.tag.env-short", // is this standard enough?
"ec2.tag.env", // is this standard enough?
"service.name",
"cloudwatch.log.group.name",
"k8s.deployment.name",
"k8s.deployment.uid",
"k8s.statefulset.name",
"k8s.statefulset.uid",
"k8s.daemonset.name",
"k8s.daemonset.uid",
"k8s.job.name",
"k8s.job.uid",
"k8s.cronjob.name",
"k8s.cronjob.uid",
"faas.name",
"ec2.tag.service", // is this standard enough?
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"service.instance.id",
"k8s.pod.name",
"k8s.pod.uid",
"aws.ecs.task.id",
"aws.ecs.task.arn",
"cloudwatch.log.stream",
"cloud.resource_id",
"faas.instance",
"host.id",
"host.name",
"host.ip",
"host",
"deployment.environment",
"ec2.tag.env-short", // is this standard enough?
"ec2.tag.env", // is this standard enough?
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.container.name",
"container.name",
"container_name",
"service.instance.id",
"k8s.pod.name",
"k8s.pod.uid",
"aws.ecs.task.id",
"aws.ecs.task.arn",
"cloudwatch.log.stream",
"cloud.resource_id",
"faas.instance",
"host.id",
"host.name",
"host.ip",
"host",
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.container.name",
"container.name",
"container_name",
},
}},
}},
}},
}},
}},
},
},

// Node oriented view
{
labels: []string{"cloud.availability_zone"},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.node.name",
"k8s.node.uid",
"host.id",
"host.name",
"host.ip",
"host",
},
// Node oriented view
{
labels: []string{"cloud.availability_zone"},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.pod.name",
"k8s.pod.uid",
"aws.ecs.task.id",
"aws.ecs.task.arn",
"k8s.node.name",
"k8s.node.uid",
"host.id",
"host.name",
"host.ip",
"host",
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.container.name",
"container.name",
"k8s.pod.name",
"k8s.pod.uid",
"aws.ecs.task.id",
"aws.ecs.task.arn",
},

subHierachies: []DimensionHierarchyNode{{
labels: []string{
"k8s.container.name",
"container.name",
},
}},
}},
}},
}},
}},
}},
}},
}},
}},
Expand All @@ -176,6 +178,8 @@ func ResourceHierarchy() *DimensionHierarchyNode {
// Calculates fingerprint for attributes that when sorted would keep fingerprints
// for the same set of attributes next to each other while also colocating
// entries at all levels of the hierarchy
// For example, fingerprints like "k8s.deployment.name=webserver;k8s.pod.name=webserver-0"
// will calculate logs for each webserver pod while also calculate all logs for the webserver deployment
func CalculateFingerprint(
attributes map[string]any, hierarchy *DimensionHierarchyNode,
) string {
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhouselogsexporter/logsv2/fingerprint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestCalculateFingerprint(t *testing.T) {
},
{
Name: "Vector and gcp",
ResourceAttrs: map[string]any{"gcp.project": "myproject", "source": "gcp", "random_key": "val"},
ResourceAttrs: map[string]any{"gcp.project": "myproject", "source_type": "gcp", "random_key": "val"},
FingerPrint: "source_type=gcp;gcp.project=myproject;hash=11162778839006855273",
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.logs_v2 ON CLUSTER {{.SIGNOZ_CLUSTER}}
`resources_string` Map(String, String) CODEC(ZSTD(1)),
`scope_name` String CODEC(ZSTD(1)),
`scope_version` String CODEC(ZSTD(1)),
`scope_string_key` Array(String) CODEC(ZSTD(1)),
`scope_string_value` Array(String) CODEC(ZSTD(1)),
`scope_string` Map(String, String) CODEC(ZSTD(1)),
INDEX body_idx lower(body) TYPE ngrambf_v1(4, 60000, 5, 0) GRANULARITY 1,
INDEX id_minmax id TYPE minmax GRANULARITY 1,
INDEX severity_number_idx severity_number TYPE set(25) GRANULARITY 4,
Expand Down Expand Up @@ -79,8 +78,7 @@ CREATE TABLE IF NOT EXISTS signoz_logs.distributed_logs_v2 ON CLUSTER {{.SIGNOZ
`resources_string` Map(String, String) CODEC(ZSTD(1)),
`scope_name` String CODEC(ZSTD(1)),
`scope_version` String CODEC(ZSTD(1)),
`scope_string_key` Array(String) CODEC(ZSTD(1)),
`scope_string_value` Array(String) CODEC(ZSTD(1))
`scope_string` Map(String, String) CODEC(ZSTD(1))
)
ENGINE = Distributed('cluster', 'signoz_logs', 'logs_v2', cityHash64(id));

Expand Down

0 comments on commit 736c441

Please sign in to comment.