diff --git a/exporter/clickhousemetricsexporter/base/base.go b/exporter/clickhousemetricsexporter/base/base.go index 0a3cd2f1..693dd44a 100644 --- a/exporter/clickhousemetricsexporter/base/base.go +++ b/exporter/clickhousemetricsexporter/base/base.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/prometheus/prometheus/prompb" ) @@ -34,7 +35,7 @@ type Storage interface { // Read(context.Context, []Query) (*prompb.ReadResponse, error) // Write puts data into storage. - Write(context.Context, *prompb.WriteRequest) error + Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error // Returns the DB conn. GetDBConn() interface{} diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index f2cc9097..6a1de323 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -31,6 +31,8 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.13.0" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries" @@ -39,14 +41,18 @@ import ( ) const ( - namespace = "promhouse" - subsystem = "clickhouse" - nameLabel = "__name__" - CLUSTER = "cluster" - DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2" - DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" - TIME_SERIES_TABLE = "time_series_v2" - SAMPLES_TABLE = "samples_v2" + namespace = "promhouse" + subsystem = "clickhouse" + nameLabel = "__name__" + CLUSTER = "cluster" + DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2" + DISTRIBUTED_TIME_SERIES_TABLE_V3 = "distributed_time_series_v3" + DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" + TIME_SERIES_TABLE = "time_series_v2" + TIME_SERIES_TABLE_V3 = "time_series_v3" + SAMPLES_TABLE = "samples_v2" + temporalityLabel = "__temporality__" + envLabel = "env" ) // clickHouse implements storage interface for the ClickHouse. @@ -60,7 +66,9 @@ type clickHouse struct { // Maintains the lookup map for fingerprints that are // written to time series table. This map is used to eliminate the // unnecessary writes to table for the records that already exist. - timeSeries map[uint64]struct{} + timeSeries map[uint64]struct{} + prevShardCount uint64 + watcherInterval time.Duration mWrittenTimeSeries prometheus.Counter } @@ -70,6 +78,7 @@ type ClickHouseParams struct { DropDatabase bool MaxOpenConns int MaxTimeSeriesInQuery int + WatcherInterval time.Duration } func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { @@ -135,6 +144,35 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { queries = append(queries, fmt.Sprintf(` ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER)) + // Add temporality column to time_series table + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER)) + + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) + + // Add set index on temporality column + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER)) + + // Create a new table + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s ( + env LowCardinality(String) DEFAULT 'default', + temporality LowCardinality(String) DEFAULT 'Unspecified', + metric_name LowCardinality(String), + fingerprint UInt64 CODEC(Delta, ZSTD), + timestamp_ms Int64 CODEC(Delta, ZSTD), + labels String CODEC(ZSTD(5)) + ) + ENGINE = ReplacingMergeTree + PARTITION BY toDate(timestamp_ms / 1000) + ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER)) + + // Create a new distributed table + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3)) + options := &clickhouse.Options{ Addr: []string{dsnURL.Host}, } @@ -161,6 +199,15 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { } } + // TODO(srikanthccv): Remove this once we have a better way to handle data and last write + removeTTL := fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s REMOVE TTL;`, database, TIME_SERIES_TABLE, CLUSTER) + if err = conn.Exec(context.Background(), removeTTL); err != nil { + if !strings.Contains(err.Error(), "Table doesn't have any table TTL expression, cannot remove.") { + return nil, err + } + } + ch := &clickHouse{ conn: conn, l: l, @@ -175,60 +222,48 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { Name: "written_time_series", Help: "Number of written time series.", }), + watcherInterval: params.WatcherInterval, } go func() { ctx := pprof.WithLabels(context.TODO(), pprof.Labels("component", "clickhouse_reloader")) pprof.SetGoroutineLabels(ctx) - ch.runTimeSeriesReloader(ctx) + ch.shardCountWatcher(ctx) }() return ch, nil } -// runTimeSeriesReloader periodically queries the time series table -// and updates the timeSeries lookup map with new fingerprints. -// One might wonder why is there a need to reload the data from clickhouse -// when it just suffices to keep track of the fingerprint for the incoming -// write requests. This is because there could be multiple instance of -// metric exporters and they would only contain partial info with latter -// approach. -func (ch *clickHouse) runTimeSeriesReloader(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) +func (ch *clickHouse) shardCountWatcher(ctx context.Context) { + ticker := time.NewTicker(ch.watcherInterval) defer ticker.Stop() - q := fmt.Sprintf(`SELECT DISTINCT fingerprint FROM %s.%s`, ch.database, DISTRIBUTED_TIME_SERIES_TABLE) + q := `SELECT count() FROM system.clusters WHERE cluster='cluster'` for { - ch.timeSeriesRW.RLock() - timeSeries := make(map[uint64]struct{}, len(ch.timeSeries)) - ch.timeSeriesRW.RUnlock() err := func() error { ch.l.Debug(q) - rows, err := ch.conn.Query(ctx, q) + row := ch.conn.QueryRow(ctx, q) + if row.Err() != nil { + return row.Err() + } + + var shardCount uint64 + err := row.Scan(&shardCount) if err != nil { return err } - defer rows.Close() - var f uint64 - for rows.Next() { - if err = rows.Scan(&f); err != nil { - return err - } - timeSeries[f] = struct{}{} - } - return rows.Err() - }() - if err == nil { ch.timeSeriesRW.Lock() - n := len(timeSeries) - len(ch.timeSeries) - for f, m := range timeSeries { - ch.timeSeries[f] = m + if ch.prevShardCount != shardCount { + ch.l.Infof("Shard count changed from %d to %d. Resetting time series map.", ch.prevShardCount, shardCount) + ch.timeSeries = make(map[uint64]struct{}) } + ch.prevShardCount = shardCount ch.timeSeriesRW.Unlock() - ch.l.Debugf("Loaded %d existing time series, %d were unknown to this instance.", len(timeSeries), n) - } else { + return nil + }() + if err != nil { ch.l.Error(err) } @@ -253,14 +288,15 @@ func (ch *clickHouse) GetDBConn() interface{} { return ch.conn } -func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) error { +func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error { // calculate fingerprints, map them to time series fingerprints := make([]uint64, len(data.Timeseries)) timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries)) - fingerprintToName := make(map[uint64]string) + fingerprintToName := make(map[uint64]map[string]string) for i, ts := range data.Timeseries { var metricName string + var env string = "default" labelsOverridden := make(map[string]*prompb.Label) for _, label := range ts.Labels { labelsOverridden[label.Name] = &prompb.Label{ @@ -270,16 +306,32 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro if label.Name == nameLabel { metricName = label.Value } + if label.Name == semconv.AttributeDeploymentEnvironment || label.Name == sanitize(semconv.AttributeDeploymentEnvironment) { + env = label.Value + } } var labels []*prompb.Label for _, l := range labelsOverridden { labels = append(labels, l) } + // add temporality label + if metricName != "" { + if t, ok := metricNameToTemporality[metricName]; ok { + labels = append(labels, &prompb.Label{ + Name: temporalityLabel, + Value: t.String(), + }) + } + } timeseries.SortLabels(labels) f := timeseries.Fingerprint(labels) fingerprints[i] = f timeSeries[f] = labels - fingerprintToName[f] = metricName + if _, ok := fingerprintToName[f]; !ok { + fingerprintToName[f] = make(map[string]string) + } + fingerprintToName[f][nameLabel] = metricName + fingerprintToName[f][env] = env } if len(fingerprints) != len(timeSeries) { ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries)) @@ -304,7 +356,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro return err } - statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE)) + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE)) if err != nil { return err } @@ -312,7 +364,8 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro for fingerprint, labels := range newTimeSeries { encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) err = statement.Append( - fingerprintToName[fingerprint], + fingerprintToName[fingerprint][nameLabel], + metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(), timestamp, fingerprint, encodedLabels, @@ -336,6 +389,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro return err } + // Write to time_series_v3 table + err = func() error { + ctx := context.Background() + err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`) + if err != nil { + return err + } + + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3)) + if err != nil { + return err + } + timestamp := model.Now().Time().UnixMilli() + for fingerprint, labels := range newTimeSeries { + encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) + err = statement.Append( + fingerprintToName[fingerprint][envLabel], + metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(), + fingerprintToName[fingerprint][nameLabel], + fingerprint, + timestamp, + encodedLabels, + ) + if err != nil { + return err + } + } + + start := time.Now() + err = statement.Send() + ctx, _ = tag.New(ctx, + tag.Upsert(exporterKey, string(component.DataTypeMetrics)), + tag.Upsert(tableKey, TIME_SERIES_TABLE_V3), + ) + stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) + return err + }() + + if err != nil { + return err + } + metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -367,7 +462,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro } err = statement.Append( - fingerprintToName[fingerprint], + fingerprintToName[fingerprint][nameLabel], fingerprint, s.Timestamp, s.Value, diff --git a/exporter/clickhousemetricsexporter/config.go b/exporter/clickhousemetricsexporter/config.go index be05ed3d..75d5f6e1 100644 --- a/exporter/clickhousemetricsexporter/config.go +++ b/exporter/clickhousemetricsexporter/config.go @@ -16,6 +16,7 @@ package clickhousemetricsexporter import ( "fmt" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" "go.opentelemetry.io/collector/component" @@ -43,6 +44,8 @@ type Config struct { // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"` + + WatcherInterval time.Duration `mapstructure:"watcher_interval"` } // RemoteWriteQueue allows to configure the remote write queue. diff --git a/exporter/clickhousemetricsexporter/config_test.go b/exporter/clickhousemetricsexporter/config_test.go index 21614687..c88305d4 100644 --- a/exporter/clickhousemetricsexporter/config_test.go +++ b/exporter/clickhousemetricsexporter/config_test.go @@ -82,6 +82,7 @@ func Test_loadConfig(t *testing.T) { }, }, ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, + WatcherInterval: 30 * time.Second, }) } diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index e6b3cbc0..45039c0f 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -29,7 +29,6 @@ import ( "github.com/pkg/errors" "go.opencensus.io/stats/view" "go.uber.org/multierr" - "go.uber.org/zap" "github.com/prometheus/prometheus/prompb" @@ -47,18 +46,20 @@ const maxBatchByteSize = 3000000 // PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type PrwExporter struct { - namespace string - externalLabels map[string]string - endpointURL *url.URL - client *http.Client - wg *sync.WaitGroup - closeChan chan struct{} - concurrency int - userAgentHeader string - clientSettings *confighttp.HTTPClientSettings - settings component.TelemetrySettings - ch base.Storage - usageCollector *usage.UsageCollector + namespace string + externalLabels map[string]string + endpointURL *url.URL + client *http.Client + wg *sync.WaitGroup + closeChan chan struct{} + concurrency int + userAgentHeader string + clientSettings *confighttp.HTTPClientSettings + settings component.TelemetrySettings + ch base.Storage + usageCollector *usage.UsageCollector + metricNameToTemporality map[string]pmetric.AggregationTemporality + mux *sync.Mutex } // NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly. @@ -82,6 +83,7 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err DropDatabase: false, MaxOpenConns: 75, MaxTimeSeriesInQuery: 50, + WatcherInterval: cfg.WatcherInterval, } ch, err := NewClickHouse(params) if err != nil { @@ -106,17 +108,19 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err } return &PrwExporter{ - namespace: cfg.Namespace, - externalLabels: sanitizedLabels, - endpointURL: endpointURL, - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), - userAgentHeader: userAgentHeader, - concurrency: cfg.RemoteWriteQueue.NumConsumers, - clientSettings: &cfg.HTTPClientSettings, - settings: set.TelemetrySettings, - ch: ch, - usageCollector: collector, + namespace: cfg.Namespace, + externalLabels: sanitizedLabels, + endpointURL: endpointURL, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + userAgentHeader: userAgentHeader, + concurrency: cfg.RemoteWriteQueue.NumConsumers, + clientSettings: &cfg.HTTPClientSettings, + settings: set.TelemetrySettings, + ch: ch, + usageCollector: collector, + metricNameToTemporality: make(map[string]pmetric.AggregationTemporality), + mux: new(sync.Mutex), }, nil } @@ -166,37 +170,31 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er // TODO: decide if scope information should be exported as labels for k := 0; k < metricSlice.Len(); k++ { metric := metricSlice.At(k) + var temporality pmetric.AggregationTemporality - // check for valid type and temporality combination and for matching data field and type - if ok := validateMetrics(metric); !ok { - dropped++ - errs = multierr.Append(errs, consumererror.NewPermanent(errors.New("invalid temporality and type combination"))) - serviceName, found := resource.Attributes().Get("service.name") - if !found { - serviceName = pcommon.NewValueStr("") - } - metricType := metric.Type() - var numDataPoints int - var temporality pmetric.AggregationTemporality - switch metricType { - case pmetric.MetricTypeGauge: - numDataPoints = metric.Gauge().DataPoints().Len() - case pmetric.MetricTypeSum: - numDataPoints = metric.Sum().DataPoints().Len() - temporality = metric.Sum().AggregationTemporality() - case pmetric.MetricTypeHistogram: - numDataPoints = metric.Histogram().DataPoints().Len() - temporality = metric.Histogram().AggregationTemporality() - case pmetric.MetricTypeSummary: - numDataPoints = metric.Summary().DataPoints().Len() - default: - } - prwe.settings.Logger.Error("unsupported metric type and temporality combination", zap.Int("numDataPoints", numDataPoints), zap.Any("metricType", metricType), zap.Any("temporality", temporality), zap.String("service name", serviceName.AsString())) - continue + metricType := metric.Type() + + switch metricType { + case pmetric.MetricTypeGauge: + temporality = pmetric.AggregationTemporalityUnspecified + case pmetric.MetricTypeSum: + temporality = metric.Sum().AggregationTemporality() + case pmetric.MetricTypeHistogram: + temporality = metric.Histogram().AggregationTemporality() + case pmetric.MetricTypeSummary: + temporality = pmetric.AggregationTemporalityUnspecified + default: + } + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality + + if metricType == pmetric.MetricTypeHistogram || metricType == pmetric.MetricTypeSummary { + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+bucketStr] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+countStr] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+sumStr] = temporality } // handle individual metric based on type - switch metric.Type() { + switch metricType { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() if err := prwe.addNumberDataPointSlice(dataPoints, tsMap, resource, metric); err != nil { @@ -279,6 +277,13 @@ func (prwe *PrwExporter) addNumberDataPointSlice(dataPoints pmetric.NumberDataPo // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { + prwe.mux.Lock() + // make a copy of metricNameToTemporality + metricNameToTemporality := make(map[string]pmetric.AggregationTemporality) + for k, v := range prwe.metricNameToTemporality { + metricNameToTemporality[k] = v + } + prwe.mux.Unlock() var errs []error // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, maxBatchByteSize) @@ -306,7 +311,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti defer wg.Done() for request := range input { - err := prwe.ch.Write(ctx, request) + err := prwe.ch.Write(ctx, request, metricNameToTemporality) if err != nil { mu.Lock() errs = append(errs, err) diff --git a/exporter/clickhousemetricsexporter/factory.go b/exporter/clickhousemetricsexporter/factory.go index 47ebbe54..ce6a7f40 100644 --- a/exporter/clickhousemetricsexporter/factory.go +++ b/exporter/clickhousemetricsexporter/factory.go @@ -128,5 +128,6 @@ func createDefaultConfig() component.Config { QueueSize: 10000, NumConsumers: 5, }, + WatcherInterval: 30 * time.Second, } } diff --git a/exporter/clickhousemetricsexporter/helper.go b/exporter/clickhousemetricsexporter/helper.go index fe5bdbf7..8b9c1649 100644 --- a/exporter/clickhousemetricsexporter/helper.go +++ b/exporter/clickhousemetricsexporter/helper.go @@ -69,9 +69,9 @@ func validateMetrics(metric pmetric.Metric) bool { case pmetric.MetricTypeGauge: return metric.Gauge().DataPoints().Len() != 0 case pmetric.MetricTypeSum: - return metric.Sum().DataPoints().Len() != 0 && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Sum().DataPoints().Len() != 0 case pmetric.MetricTypeHistogram: - return metric.Histogram().DataPoints().Len() != 0 && metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Histogram().DataPoints().Len() != 0 case pmetric.MetricTypeSummary: return metric.Summary().DataPoints().Len() != 0 } diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 27c40ce9..2f46a20e 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -428,21 +428,21 @@ func (s *storage) Shutdown(_ context.Context) error { func extractSpanAttributesFromSpanIndex(span *Span) []SpanAttribute { spanAttributes := []SpanAttribute{} spanAttributes = append(spanAttributes, SpanAttribute{ - Key: "traceId", + Key: "traceID", TagType: "tag", IsColumn: true, DataType: "string", StringValue: span.TraceId, }) spanAttributes = append(spanAttributes, SpanAttribute{ - Key: "spanId", + Key: "spanID", TagType: "tag", IsColumn: true, DataType: "string", StringValue: span.SpanId, }) spanAttributes = append(spanAttributes, SpanAttribute{ - Key: "parentSpanId", + Key: "parentSpanID", TagType: "tag", IsColumn: true, DataType: "string", diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index ac386c5e..f8ea7e45 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -318,7 +318,7 @@ func (w *SpanWriter) writeTagBatch(batchSpans []*Span) error { ) if err != nil { logBatch := batchSpans[:int(math.Min(10, float64(len(batchSpans))))] - w.logger.Error("Could not write to span attributes table due to error: ", zap.Error(err),zap.Any("batch", logBatch)) + w.logger.Error("Could not write to span attributes table due to error: ", zap.Error(err), zap.Any("batch", logBatch)) return err } @@ -404,8 +404,10 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error { metrics := map[string]usage.Metric{} for _, span := range batchSpans { var serialized []byte - + usageMap := span.TraceModel + usageMap.TagMap = map[string]string{} serialized, err = json.Marshal(span.TraceModel) + serializedUsage, err := json.Marshal(usageMap) if err != nil { return err @@ -417,7 +419,7 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error { return err } - usage.AddMetric(metrics, *span.Tenant, 1, int64(len(serialized))) + usage.AddMetric(metrics, *span.Tenant, 1, int64(len(serializedUsage))) } start := time.Now() diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index 33fa30eb..c6ba2dc2 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -302,7 +302,7 @@ func (p *processorImp) shouldSkip(serviceName string, span ptrace.Span, resource // Start implements the component.Component interface. func (p *processorImp) Start(ctx context.Context, host component.Host) error { - p.logger.Info("Starting signozspanmetricsprocessor") + p.logger.Info("Starting signozspanmetricsprocessor with config", zap.Any("config", p.config)) exporters := host.GetExporters() var availableMetricsExporters []string @@ -461,12 +461,14 @@ func (p *processorImp) collectDBCallMetrics(ilm pmetric.ScopeMetrics) error { mDBCallSum := ilm.Metrics().AppendEmpty() mDBCallSum.SetName("signoz_db_latency_sum") mDBCallSum.SetUnit("1") - mDBCallSum.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mDBCallSum.SetEmptySum().SetIsMonotonic(true) + mDBCallSum.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) mDBCallCount := ilm.Metrics().AppendEmpty() mDBCallCount.SetName("signoz_db_latency_count") mDBCallCount.SetUnit("1") - mDBCallCount.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mDBCallCount.SetEmptySum().SetIsMonotonic(true) + mDBCallCount.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) callSumDps := mDBCallSum.Sum().DataPoints() callCountDps := mDBCallCount.Sum().DataPoints() @@ -501,12 +503,14 @@ func (p *processorImp) collectExternalCallMetrics(ilm pmetric.ScopeMetrics) erro mExternalCallSum := ilm.Metrics().AppendEmpty() mExternalCallSum.SetName("signoz_external_call_latency_sum") mExternalCallSum.SetUnit("1") - mExternalCallSum.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mExternalCallSum.SetEmptySum().SetIsMonotonic(true) + mExternalCallSum.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) mExternalCallCount := ilm.Metrics().AppendEmpty() mExternalCallCount.SetName("signoz_external_call_latency_count") mExternalCallCount.SetUnit("1") - mExternalCallCount.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mExternalCallCount.SetEmptySum().SetIsMonotonic(true) + mExternalCallCount.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) callSumDps := mExternalCallSum.Sum().DataPoints() callCountDps := mExternalCallCount.Sum().DataPoints()