Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: write int64 tag attributes for old schema #373

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/clickhouselogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
DSN string `mapstructure:"dsn"`
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster" default:"false"`
UseNewSchema bool `mapstructure:"use_new_schema" default:"false"`
}

var (
Expand Down
6 changes: 5 additions & 1 deletion exporter/clickhouselogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)
assert.Equal(t, len(cfg.Exporters), 3)

defaultCfg := factory.CreateDefaultConfig()
defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000/?dial_timeout=5s"
Expand All @@ -64,4 +64,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 100,
},
})

defaultCfg.(*Config).UseNewSchema = true
r2 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "new_schema")]
assert.Equal(t, r2, defaultCfg)
}
18 changes: 13 additions & 5 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type clickhouseLogsExporter struct {

wg *sync.WaitGroup
closeChan chan struct{}

useNewSchema bool
}

func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) {
Expand Down Expand Up @@ -109,6 +111,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
usageCollector: collector,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
useNewSchema: cfg.UseNewSchema,
}, nil
}

Expand Down Expand Up @@ -274,7 +277,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
resourceJson := string(serializedRes)

err = addAttrsToTagStatement(tagStatement, "resource", resources)
err = addAttrsToTagStatement(tagStatement, "resource", resources, e.useNewSchema)
if err != nil {
return err
}
Expand All @@ -290,7 +293,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
scopeAttributes := attributesToSlice(scope.Attributes(), true)
scopeMap := attributesToMap(scope.Attributes(), true)

err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes)
err := addAttrsToTagStatement(tagStatement, "scope", scopeAttributes, e.useNewSchema)
if err != nil {
return err
}
Expand Down Expand Up @@ -328,7 +331,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
attributes := attributesToSlice(r.Attributes(), false)
attrsMap := attributesToMap(r.Attributes(), false)

err := addAttrsToTagStatement(tagStatement, "tag", attributes)
err := addAttrsToTagStatement(tagStatement, "tag", attributes, e.useNewSchema)
if err != nil {
return err
}
Expand Down Expand Up @@ -507,7 +510,7 @@ func getStringifiedBody(body pcommon.Value) string {
return strBody
}

func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse) error {
func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attributesToSliceResponse, useNewSchema bool) error {
for i, v := range attrs.StringKeys {
err := statement.Append(
time.Now(),
Expand All @@ -522,12 +525,17 @@ func addAttrsToTagStatement(statement driver.Batch, tagType string, attrs attrib
return fmt.Errorf("could not append string attribute to batch, err: %s", err)
}
}

intTypeName := "int64"
if useNewSchema {
intTypeName = "float64"
}
for i, v := range attrs.IntKeys {
err := statement.Append(
time.Now(),
v,
tagType,
"float64",
intTypeName,
nil,
nil,
attrs.IntValues[i],
Expand Down
3 changes: 3 additions & 0 deletions exporter/clickhouselogsexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ exporters:
multiplier: 1.3
sending_queue:
queue_size: 100
clickhouselogsexporter/new_schema:
dsn: tcp://127.0.0.1:9000/?dial_timeout=5s
use_new_schema: true

service:
pipelines:
Expand Down
Loading