diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 2f46a20e..25176f15 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -385,6 +385,7 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error { rss := td.ResourceSpans() + var batchOfSpans []*Span for i := 0; i < rss.Len(); i++ { // fmt.Printf("ResourceSpans #%d\n", i) rs := rss.At(i) @@ -400,16 +401,15 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error { for k := 0; k < spans.Len(); k++ { span := spans.At(k) - // traceID := hex.EncodeToString(span.TraceID()) structuredSpan := newStructuredSpan(span, serviceName, rs.Resource(), s.config) - err := s.Writer.WriteSpan(structuredSpan) - if err != nil { - zap.S().Error("Error in writing spans to clickhouse: ", err) - } + batchOfSpans = append(batchOfSpans, structuredSpan) } } } - + err := s.Writer.WriteBatchOfSpans(batchOfSpans) + if err != nil { + zap.S().Error("Error in writing spans to clickhouse: ", err) + } return nil } diff --git a/exporter/clickhousetracesexporter/clickhouse_factory.go b/exporter/clickhousetracesexporter/clickhouse_factory.go index aa957301..2fae266e 100644 --- a/exporter/clickhousetracesexporter/clickhouse_factory.go +++ b/exporter/clickhousetracesexporter/clickhouse_factory.go @@ -43,7 +43,7 @@ type Factory struct { // Writer writes spans to storage. type Writer interface { - WriteSpan(span *Span) error + WriteBatchOfSpans(span []*Span) error } type writerMaker func(WriterOptions) (Writer, error) @@ -281,8 +281,6 @@ func (f *Factory) CreateSpanWriter() (Writer, error) { attributeTable: cfg.AttributeTable, attributeKeyTable: cfg.AttributeKeyTable, encoding: cfg.Encoding, - delay: cfg.WriteBatchDelay, - size: cfg.WriteBatchSize, }) } @@ -302,8 +300,6 @@ func (f *Factory) CreateArchiveSpanWriter() (Writer, error) { attributeTable: cfg.AttributeTable, attributeKeyTable: cfg.AttributeKeyTable, encoding: cfg.Encoding, - delay: cfg.WriteBatchDelay, - size: cfg.WriteBatchSize, }) } diff --git a/exporter/clickhousetracesexporter/options.go b/exporter/clickhousetracesexporter/options.go index 2f16657e..db182916 100644 --- a/exporter/clickhousetracesexporter/options.go +++ b/exporter/clickhousetracesexporter/options.go @@ -19,34 +19,31 @@ import ( "flag" "fmt" "net/url" - "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/spf13/viper" ) const ( - defaultDatasource string = "tcp://127.0.0.1:9000/?database=signoz_traces" - defaultTraceDatabase string = "signoz_traces" - defaultMigrations string = "/migrations" - defaultOperationsTable string = "distributed_signoz_operations" - defaultIndexTable string = "distributed_signoz_index_v2" - localIndexTable string = "signoz_index_v2" - defaultErrorTable string = "distributed_signoz_error_index_v2" - defaultSpansTable string = "distributed_signoz_spans" - defaultAttributeTable string = "distributed_span_attributes" - defaultAttributeKeyTable string = "distributed_span_attributes_keys" - defaultDurationSortTable string = "durationSort" - defaultDurationSortMVTable string = "durationSortMV" - defaultArchiveSpansTable string = "signoz_archive_spans" - defaultClusterName string = "cluster" - defaultDependencyGraphTable string = "dependency_graph_minutes" - defaultDependencyGraphServiceMV string = "dependency_graph_minutes_service_calls_mv" - defaultDependencyGraphDbMV string = "dependency_graph_minutes_db_calls_mv" - DependencyGraphMessagingMV string = "dependency_graph_minutes_messaging_calls_mv" - defaultWriteBatchDelay time.Duration = 2 * time.Second - defaultWriteBatchSize int = 100000 - defaultEncoding Encoding = EncodingJSON + defaultDatasource string = "tcp://127.0.0.1:9000/?database=signoz_traces" + defaultTraceDatabase string = "signoz_traces" + defaultMigrations string = "/migrations" + defaultOperationsTable string = "distributed_signoz_operations" + defaultIndexTable string = "distributed_signoz_index_v2" + localIndexTable string = "signoz_index_v2" + defaultErrorTable string = "distributed_signoz_error_index_v2" + defaultSpansTable string = "distributed_signoz_spans" + defaultAttributeTable string = "distributed_span_attributes" + defaultAttributeKeyTable string = "distributed_span_attributes_keys" + defaultDurationSortTable string = "durationSort" + defaultDurationSortMVTable string = "durationSortMV" + defaultArchiveSpansTable string = "signoz_archive_spans" + defaultClusterName string = "cluster" + defaultDependencyGraphTable string = "dependency_graph_minutes" + defaultDependencyGraphServiceMV string = "dependency_graph_minutes_service_calls_mv" + defaultDependencyGraphDbMV string = "dependency_graph_minutes_db_calls_mv" + DependencyGraphMessagingMV string = "dependency_graph_minutes_messaging_calls_mv" + defaultEncoding Encoding = EncodingJSON ) const ( @@ -57,8 +54,6 @@ const ( suffixOperationsTable = ".operations-table" suffixIndexTable = ".index-table" suffixSpansTable = ".spans-table" - suffixWriteBatchDelay = ".write-batch-delay" - suffixWriteBatchSize = ".write-batch-size" suffixEncoding = ".encoding" ) @@ -84,8 +79,6 @@ type namespaceConfig struct { DependencyGraphMessagingMV string DependencyGraphTable string DockerMultiNodeCluster bool - WriteBatchDelay time.Duration - WriteBatchSize int Encoding Encoding Connector Connector } @@ -161,8 +154,6 @@ func NewOptions(migrations string, datasource string, dockerMultiNodeCluster boo DependencyGraphDbMV: defaultDependencyGraphDbMV, DependencyGraphMessagingMV: DependencyGraphMessagingMV, DockerMultiNodeCluster: dockerMultiNodeCluster, - WriteBatchDelay: defaultWriteBatchDelay, - WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, Connector: defaultConnector, }, @@ -178,8 +169,6 @@ func NewOptions(migrations string, datasource string, dockerMultiNodeCluster boo OperationsTable: "", IndexTable: "", SpansTable: defaultArchiveSpansTable, - WriteBatchDelay: defaultWriteBatchDelay, - WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, Connector: defaultConnector, } @@ -233,18 +222,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "Clickhouse spans table name.", ) - flagSet.Duration( - nsConfig.namespace+suffixWriteBatchDelay, - nsConfig.WriteBatchDelay, - "A duration after which spans are flushed to Clickhouse", - ) - - flagSet.Int( - nsConfig.namespace+suffixWriteBatchSize, - nsConfig.WriteBatchSize, - "A number of spans buffered before they are flushed to Clickhouse", - ) - flagSet.String( nsConfig.namespace+suffixEncoding, string(nsConfig.Encoding), @@ -267,8 +244,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.IndexTable = v.GetString(cfg.namespace + suffixIndexTable) cfg.SpansTable = v.GetString(cfg.namespace + suffixSpansTable) cfg.OperationsTable = v.GetString(cfg.namespace + suffixOperationsTable) - cfg.WriteBatchDelay = v.GetDuration(cfg.namespace + suffixWriteBatchDelay) - cfg.WriteBatchSize = v.GetInt(cfg.namespace + suffixWriteBatchSize) cfg.Encoding = Encoding(v.GetString(cfg.namespace + suffixEncoding)) } diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index f8ea7e45..4ebfe1e0 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "strings" - "sync" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -52,11 +51,6 @@ type SpanWriter struct { attributeTable string attributeKeyTable string encoding Encoding - delay time.Duration - size int - spans chan *Span - finish chan bool - done sync.WaitGroup } type WriterOptions struct { @@ -69,8 +63,6 @@ type WriterOptions struct { attributeTable string attributeKeyTable string encoding Encoding - delay time.Duration - size int } // NewSpanWriter returns a SpanWriter for the database @@ -88,91 +80,11 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { attributeTable: options.attributeTable, attributeKeyTable: options.attributeKeyTable, encoding: options.encoding, - delay: options.delay, - size: options.size, - spans: make(chan *Span, options.size), - finish: make(chan bool), } - go writer.backgroundWriter() - return writer } -func (w *SpanWriter) backgroundWriter() { - batch := make([]*Span, 0, w.size) - - timer := time.After(w.delay) - last := time.Now() - - for { - w.done.Add(1) - - flush := false - finish := false - - select { - case span := <-w.spans: - batch = append(batch, span) - flush = len(batch) == cap(batch) - case <-timer: - timer = time.After(w.delay) - flush = time.Since(last) > w.delay && len(batch) > 0 - case <-w.finish: - finish = true - flush = len(batch) > 0 - } - - if flush { - if err := w.writeBatch(batch); err != nil { - w.logger.Error("Could not write a batch of spans", zap.Error(err)) - } - - batch = make([]*Span, 0, w.size) - last = time.Now() - } - - w.done.Done() - - if finish { - break - } - } -} - -func (w *SpanWriter) writeBatch(batch []*Span) error { - - if w.spansTable != "" { - if err := w.writeModelBatch(batch); err != nil { - logBatch := batch[:int(math.Min(10, float64(len(batch))))] - w.logger.Error("Could not write a batch of spans to model table: ", zap.Any("batch", logBatch), zap.Error(err)) - return err - } - } - if w.indexTable != "" { - if err := w.writeIndexBatch(batch); err != nil { - logBatch := batch[:int(math.Min(10, float64(len(batch))))] - w.logger.Error("Could not write a batch of spans to index table: ", zap.Any("batch", logBatch), zap.Error(err)) - return err - } - } - if w.errorTable != "" { - if err := w.writeErrorBatch(batch); err != nil { - logBatch := batch[:int(math.Min(10, float64(len(batch))))] - w.logger.Error("Could not write a batch of spans to error table: ", zap.Any("batch", logBatch), zap.Error(err)) - return err - } - } - if w.attributeTable != "" && w.attributeKeyTable != "" { - if err := w.writeTagBatch(batch); err != nil { - w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err)) - return err - } - } - - return nil -} - func (w *SpanWriter) writeIndexBatch(batchSpans []*Span) error { ctx := context.Background() @@ -439,15 +351,42 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error { return nil } -// WriteSpan writes the encoded span -func (w *SpanWriter) WriteSpan(span *Span) error { - w.spans <- span +// WriteBatchOfSpans writes the encoded batch of spans +func (w *SpanWriter) WriteBatchOfSpans(batch []*Span) error { + if w.spansTable != "" { + if err := w.writeModelBatch(batch); err != nil { + logBatch := batch[:int(math.Min(10, float64(len(batch))))] + w.logger.Error("Could not write a batch of spans to model table: ", zap.Any("batch", logBatch), zap.Error(err)) + return err + } + } + if w.indexTable != "" { + if err := w.writeIndexBatch(batch); err != nil { + logBatch := batch[:int(math.Min(10, float64(len(batch))))] + w.logger.Error("Could not write a batch of spans to index table: ", zap.Any("batch", logBatch), zap.Error(err)) + return err + } + } + if w.errorTable != "" { + if err := w.writeErrorBatch(batch); err != nil { + logBatch := batch[:int(math.Min(10, float64(len(batch))))] + w.logger.Error("Could not write a batch of spans to error table: ", zap.Any("batch", logBatch), zap.Error(err)) + return err + } + } + if w.attributeTable != "" && w.attributeKeyTable != "" { + if err := w.writeTagBatch(batch); err != nil { + w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err)) + return err + } + } return nil } -// Close Implements io.Closer and closes the underlying storage +// Close closes the writer func (w *SpanWriter) Close() error { - w.finish <- true - w.done.Wait() + if w.db != nil { + return w.db.Close() + } return nil }