Skip to content

Commit

Permalink
feat: write to old table only if use_new_schema=false
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Aug 28, 2024
1 parent a324347 commit 5f4215a
Showing 1 changed file with 42 additions and 34 deletions.
76 changes: 42 additions & 34 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
return errors.New("shutdown has been called")
default:
start := time.Now()
statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("PrepareBatch:%w", err)
chLen := 2
if !e.useNewSchema {
chLen = 3
statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("PrepareBatch:%w", err)
}
}

tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection())
Expand Down Expand Up @@ -364,31 +368,33 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}

// old table
err = statement.Append(
ts,
ots,
e.ksuid.String(),
utils.TraceIDToHexOrEmptyString(r.TraceID()),
utils.SpanIDToHexOrEmptyString(r.SpanID()),
uint32(r.Flags()),
r.SeverityText(),
uint8(r.SeverityNumber()),
getStringifiedBody(r.Body()),
resources.StringKeys,
resources.StringValues,
attributes.StringKeys,
attributes.StringValues,
attributes.IntKeys,
attributes.IntValues,
attributes.FloatKeys,
attributes.FloatValues,
attributes.BoolKeys,
attributes.BoolValues,
scopeName,
scopeVersion,
scopeAttributes.StringKeys,
scopeAttributes.StringValues,
)
if !e.useNewSchema {
err = statement.Append(
ts,
ots,
e.ksuid.String(),
utils.TraceIDToHexOrEmptyString(r.TraceID()),
utils.SpanIDToHexOrEmptyString(r.SpanID()),
uint32(r.Flags()),
r.SeverityText(),
uint8(r.SeverityNumber()),
getStringifiedBody(r.Body()),
resources.StringKeys,
resources.StringValues,
attributes.StringKeys,
attributes.StringValues,
attributes.IntKeys,
attributes.IntValues,
attributes.FloatKeys,
attributes.FloatValues,
attributes.BoolKeys,
attributes.BoolValues,
scopeName,
scopeVersion,
scopeAttributes.StringKeys,
scopeAttributes.StringValues,
)
}
if err != nil {
return fmt.Errorf("StatementAppend:%w", err)
}
Expand Down Expand Up @@ -417,18 +423,20 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}

var wg sync.WaitGroup
chErr := make(chan error, 3)
chDuration := make(chan statementSendDuration, 3)
chErr := make(chan error, chLen)
chDuration := make(chan statementSendDuration, chLen)

wg.Add(3)
go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg)
wg.Add(chLen)
if !e.useNewSchema {
go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg)
}
go send(insertLogsStmtV2, DISTRIBUTED_LOGS_TABLE_V2, chDuration, chErr, &wg)
go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg)
wg.Wait()
close(chErr)

// store the duration for send the data
for i := 0; i < 3; i++ {
for i := 0; i < chLen; i++ {
sendDuration := <-chDuration
stats.RecordWithTags(ctx,
[]tag.Mutator{
Expand All @@ -440,7 +448,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}

// check the errors
for i := 0; i < 3; i++ {
for i := 0; i < chLen; i++ {
if r := <-chErr; r != nil {
return fmt.Errorf("StatementSend:%w", r)
}
Expand Down

0 comments on commit 5f4215a

Please sign in to comment.