From 69ae34439e493751c3c61197c277457b7bf75e4a Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Tue, 27 Aug 2024 15:36:51 +0530 Subject: [PATCH 1/2] fix: write int64 tag attributes for old schema --- exporter/clickhouselogsexporter/config.go | 1 + exporter/clickhouselogsexporter/config_test.go | 6 +++++- exporter/clickhouselogsexporter/exporter.go | 18 +++++++++++++----- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/exporter/clickhouselogsexporter/config.go b/exporter/clickhouselogsexporter/config.go index 4eb49f2d..33b3f0c1 100644 --- a/exporter/clickhouselogsexporter/config.go +++ b/exporter/clickhouselogsexporter/config.go @@ -34,6 +34,7 @@ type Config struct { DSN string `mapstructure:"dsn"` // Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false. DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"` + UseNewSchema bool `mapstructure:"use_new_schema" default:"false"` } var ( diff --git a/exporter/clickhouselogsexporter/config_test.go b/exporter/clickhouselogsexporter/config_test.go index 1b6ab332..7fcf3400 100644 --- a/exporter/clickhouselogsexporter/config_test.go +++ b/exporter/clickhouselogsexporter/config_test.go @@ -37,7 +37,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, len(cfg.Exporters), 2) + assert.Equal(t, len(cfg.Exporters), 3) defaultCfg := factory.CreateDefaultConfig() defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000/?dial_timeout=5s" @@ -64,4 +64,8 @@ func TestLoadConfig(t *testing.T) { QueueSize: 100, }, }) + + defaultCfg.(*Config).UseNewSchema = true + r2 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "new_schema")] + assert.Equal(t, r2, defaultCfg) } diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 0037a150..213e7e87 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -63,6 +63,8 @@ type clickhouseLogsExporter struct { wg *sync.WaitGroup closeChan chan struct{} + + useNewSchema bool } func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) { @@ -109,6 +111,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro usageCollector: collector, wg: new(sync.WaitGroup), closeChan: make(chan struct{}), + useNewSchema: cfg.UseNewSchema, }, nil } @@ -274,7 +277,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } resourceJson := string(serializedRes) - err = addAttrsToTagStatement(tagStatement, "resource", resources) + err = addAttrsToTagStatement(tagStatement, "resource", resources, e.useNewSchema) if err != nil { return err } @@ -290,7 +293,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeAttributes := attributesToSlice(scope.Attributes(), true) scopeMap := attributesToMap(scope.Attributes(), true) - err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes) + err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes, e.useNewSchema) if err != nil { return err } @@ -328,7 +331,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L attributes := attributesToSlice(r.Attributes(), false) attrsMap := attributesToMap(r.Attributes(), false) - err := addAttrsToTagStatement(tagStatement, "tag", attributes) + err := addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema) if err != nil { return err } @@ -507,7 +510,7 @@ func getStringifiedBody(body pcommon.Value) string { return strBody } -func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse) error { +func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse, useNewSchema bool) error { for i, v := range attrs.StringKeys { err := statement.Append( time.Now(), @@ -522,12 +525,17 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib return fmt.Errorf("could not append string attribute to batch, err: %s", err) } } + + intTypeName := "int64" + if useNewSchema { + intTypeName = "float64" + } for i, v := range attrs.IntKeys { err := statement.Append( time.Now(), v, tagType, - "float64", + intTypeName, nil, nil, attrs.IntValues[i], From 36dfc54f93a15bc2331d8a8da1e5efe3d567d712 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Tue, 27 Aug 2024 15:53:33 +0530 Subject: [PATCH 2/2] fix: add test config --- exporter/clickhouselogsexporter/testdata/config.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/exporter/clickhouselogsexporter/testdata/config.yaml b/exporter/clickhouselogsexporter/testdata/config.yaml index f209f571..21313296 100644 --- a/exporter/clickhouselogsexporter/testdata/config.yaml +++ b/exporter/clickhouselogsexporter/testdata/config.yaml @@ -19,6 +19,9 @@ exporters: multiplier: 1.3 sending_queue: queue_size: 100 + clickhouselogsexporter/new_schema: + dsn: tcp://127.0.0.1:9000/?dial_timeout=5s + use_new_schema: true service: pipelines: