Skip to content

Commit

Permalink
fix: write int64 tag attributes for old schema (#373)
Browse files Browse the repository at this point in the history
* fix: write int64 tag attributes for old schema

* fix: add test config
  • Loading branch information
nityanandagohain authored Aug 27, 2024
1 parent 06bf82d commit a324347
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
1 change: 1 addition & 0 deletions exporter/clickhouselogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
DSN string `mapstructure:"dsn"`
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"`
UseNewSchema bool `mapstructure:"use_new_schema" default:"false"`
}

var (
Expand Down
6 changes: 5 additions & 1 deletion exporter/clickhouselogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)
assert.Equal(t, len(cfg.Exporters), 3)

defaultCfg := factory.CreateDefaultConfig()
defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000/?dial_timeout=5s"
Expand All @@ -64,4 +64,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 100,
},
})

defaultCfg.(*Config).UseNewSchema = true
r2 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "new_schema")]
assert.Equal(t, r2, defaultCfg)
}
18 changes: 13 additions & 5 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type clickhouseLogsExporter struct {

wg *sync.WaitGroup
closeChan chan struct{}

useNewSchema bool
}

func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) {
Expand Down Expand Up @@ -109,6 +111,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
usageCollector: collector,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
useNewSchema: cfg.UseNewSchema,
}, nil
}

Expand Down Expand Up @@ -274,7 +277,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
resourceJson := string(serializedRes)

err = addAttrsToTagStatement(tagStatement, "resource", resources)
err = addAttrsToTagStatement(tagStatement, "resource", resources, e.useNewSchema)
if err != nil {
return err
}
Expand All @@ -290,7 +293,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
scopeAttributes := attributesToSlice(scope.Attributes(), true)
scopeMap := attributesToMap(scope.Attributes(), true)

err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes)
err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes, e.useNewSchema)
if err != nil {
return err
}
Expand Down Expand Up @@ -328,7 +331,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
attributes := attributesToSlice(r.Attributes(), false)
attrsMap := attributesToMap(r.Attributes(), false)

err := addAttrsToTagStatement(tagStatement, "tag", attributes)
err := addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema)
if err != nil {
return err
}
Expand Down Expand Up @@ -507,7 +510,7 @@ func getStringifiedBody(body pcommon.Value) string {
return strBody
}

func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse) error {
func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse, useNewSchema bool) error {
for i, v := range attrs.StringKeys {
err := statement.Append(
time.Now(),
Expand All @@ -522,12 +525,17 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib
return fmt.Errorf("could not append string attribute to batch, err: %s", err)
}
}

intTypeName := "int64"
if useNewSchema {
intTypeName = "float64"
}
for i, v := range attrs.IntKeys {
err := statement.Append(
time.Now(),
v,
tagType,
"float64",
intTypeName,
nil,
nil,
attrs.IntValues[i],
Expand Down
3 changes: 3 additions & 0 deletions exporter/clickhouselogsexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ exporters:
multiplier: 1.3
sending_queue:
queue_size: 100
clickhouselogsexporter/new_schema:
dsn: tcp://127.0.0.1:9000/?dial_timeout=5s
use_new_schema: true

service:
pipelines:
Expand Down

0 comments on commit a324347

Please sign in to comment.