From bb0d7aeb40fdcfbf505aba60be25c1463413e41c Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Fri, 30 Aug 2024 10:43:37 +0530 Subject: [PATCH] feat: write to old table only if use_new_schema=false (#374) --- exporter/clickhouselogsexporter/exporter.go | 76 ++++++++++++--------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 213e7e87..6bf9d836 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -245,9 +245,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return errors.New("shutdown has been called") default: start := time.Now() - statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) - if err != nil { - return fmt.Errorf("PrepareBatch:%w", err) + chLen := 2 + if !e.useNewSchema { + chLen = 3 + statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareBatch:%w", err) + } } tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection()) @@ -364,31 +368,33 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } // old table - err = statement.Append( - ts, - ots, - e.ksuid.String(), - utils.TraceIDToHexOrEmptyString(r.TraceID()), - utils.SpanIDToHexOrEmptyString(r.SpanID()), - uint32(r.Flags()), - r.SeverityText(), - uint8(r.SeverityNumber()), - getStringifiedBody(r.Body()), - resources.StringKeys, - resources.StringValues, - attributes.StringKeys, - attributes.StringValues, - attributes.IntKeys, - attributes.IntValues, - attributes.FloatKeys, - attributes.FloatValues, - attributes.BoolKeys, - attributes.BoolValues, - scopeName, - scopeVersion, - scopeAttributes.StringKeys, - scopeAttributes.StringValues, - ) + if !e.useNewSchema { + err = statement.Append( + ts, + ots, + e.ksuid.String(), + utils.TraceIDToHexOrEmptyString(r.TraceID()), + utils.SpanIDToHexOrEmptyString(r.SpanID()), + uint32(r.Flags()), + r.SeverityText(), + uint8(r.SeverityNumber()), + getStringifiedBody(r.Body()), + resources.StringKeys, + resources.StringValues, + attributes.StringKeys, + attributes.StringValues, + attributes.IntKeys, + attributes.IntValues, + attributes.FloatKeys, + attributes.FloatValues, + attributes.BoolKeys, + attributes.BoolValues, + scopeName, + scopeVersion, + scopeAttributes.StringKeys, + scopeAttributes.StringValues, + ) + } if err != nil { return fmt.Errorf("StatementAppend:%w", err) } @@ -417,18 +423,20 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } var wg sync.WaitGroup - chErr := make(chan error, 3) - chDuration := make(chan statementSendDuration, 3) + chErr := make(chan error, chLen) + chDuration := make(chan statementSendDuration, chLen) - wg.Add(3) - go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg) + wg.Add(chLen) + if !e.useNewSchema { + go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg) + } go send(insertLogsStmtV2, DISTRIBUTED_LOGS_TABLE_V2, chDuration, chErr, &wg) go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg) wg.Wait() close(chErr) // store the duration for send the data - for i := 0; i < 3; i++ { + for i := 0; i < chLen; i++ { sendDuration := <-chDuration stats.RecordWithTags(ctx, []tag.Mutator{ @@ -440,7 +448,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } // check the errors - for i := 0; i < 3; i++ { + for i := 0; i < chLen; i++ { if r := <-chErr; r != nil { return fmt.Errorf("StatementSend:%w", r) }