Skip to content

Commit

Permalink
feat: fix duration collection
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Aug 22, 2024
1 parent e724cca commit 4904ddb
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,24 +415,24 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L

var wg sync.WaitGroup
chErr := make(chan error, 3)
chDuration := make(chan time.Duration, 3)
chDuration := make(chan statementSendDuration, 3)

wg.Add(3)
go send(statement, chDuration, chErr, &wg)
go send(insertLogsStmtV2, chDuration, chErr, &wg)
go send(insertResourcesStmtV2, chDuration, chErr, &wg)
go send(statement, DISTRIBUTED_LOGS_TABLE, chDuration, chErr, &wg)
go send(insertLogsStmtV2, DISTRIBUTED_LOGS_TABLE_V2, chDuration, chErr, &wg)
go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg)
wg.Wait()
close(chErr)

// store the duration to send the data
for _, table := range []string{DISTRIBUTED_LOGS_TABLE, DISTRIBUTED_LOGS_TABLE_V2, DISTRIBUTED_LOGS_RESOURCE_V2} {
duration := <-chDuration
// store the duration for send the data
for i := 0; i < 3; i++ {
sendDuration := <-chDuration
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, component.DataTypeLogs.String()),
tag.Upsert(tableKey, table),
tag.Upsert(tableKey, sendDuration.Name),
},
writeLatencyMillis.M(int64(duration.Milliseconds())),
writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())),
)
}

Expand Down Expand Up @@ -469,12 +469,20 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
}

func send(statement driver.Batch, durationCh chan<- time.Duration, chErr chan<- error, wg *sync.WaitGroup) {
type statementSendDuration struct {
Name string
duration time.Duration
}

func send(statement driver.Batch, tableName string, durationCh chan<- statementSendDuration, chErr chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
err := statement.Send()
chErr <- err
durationCh <- time.Since(start)
durationCh <- statementSendDuration{
Name: tableName,
duration: time.Since(start),
}
}

type attributesToSliceResponse struct {
Expand Down

0 comments on commit 4904ddb

Please sign in to comment.