Skip to content

Commit

Permalink
chore: flush spans before exit (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
makeavish authored Nov 23, 2023
1 parent 045533c commit 4a2d0ca
Showing 1 changed file with 48 additions and 27 deletions.
75 changes: 48 additions & 27 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/url"
"strconv"
"strings"
"sync"

"github.com/SigNoz/signoz-otel-collector/usage"
"github.com/SigNoz/signoz-otel-collector/utils"
Expand Down Expand Up @@ -71,7 +73,15 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) {
return nil, err
}

storage := storage{Writer: spanWriter, usageCollector: collector, config: storageConfig{lowCardinalExceptionGrouping: configClickHouse.LowCardinalExceptionGrouping}}
storage := storage{
Writer: spanWriter,
usageCollector: collector,
config: storageConfig{
lowCardinalExceptionGrouping: configClickHouse.LowCardinalExceptionGrouping,
},
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
}

return &storage, nil
}
Expand All @@ -80,6 +90,8 @@ type storage struct {
Writer Writer
usageCollector *usage.UsageCollector
config storageConfig
wg *sync.WaitGroup
closeChan chan struct{}
}

type storageConfig struct {
Expand Down Expand Up @@ -383,39 +395,48 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {

rss := td.ResourceSpans()
var batchOfSpans []*Span
for i := 0; i < rss.Len(); i++ {
// fmt.Printf("ResourceSpans #%d\n", i)
rs := rss.At(i)

serviceName := ServiceNameForResource(rs.Resource())

ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
// fmt.Printf("InstrumentationLibrarySpans #%d\n", j)
ils := ilss.At(j)

spans := ils.Spans()

for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
structuredSpan := newStructuredSpan(span, serviceName, rs.Resource(), s.config)
batchOfSpans = append(batchOfSpans, structuredSpan)
s.wg.Add(1)
defer s.wg.Done()

select {
case <-s.closeChan:
return errors.New("shutdown has been called")
default:
rss := td.ResourceSpans()
var batchOfSpans []*Span
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)

serviceName := ServiceNameForResource(rs.Resource())

ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
ils := ilss.At(j)

spans := ils.Spans()

for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
structuredSpan := newStructuredSpan(span, serviceName, rs.Resource(), s.config)
batchOfSpans = append(batchOfSpans, structuredSpan)
}
}
}
err := s.Writer.WriteBatchOfSpans(batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
return err
}
return nil
}
err := s.Writer.WriteBatchOfSpans(batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
return err
}
return nil
}

// Shutdown will shutdown the exporter.
func (s *storage) Shutdown(_ context.Context) error {

close(s.closeChan)
s.wg.Wait()

if s.usageCollector != nil {
s.usageCollector.Stop()
}
Expand Down

0 comments on commit 4a2d0ca

Please sign in to comment.