From 8d74730821cc6a88389a1bf464f0702ed25b9266 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 13 Jul 2023 18:08:16 +0530 Subject: [PATCH] Add experimental support for delta temporality (#121) --- .../clickhousemetricsexporter/base/base.go | 3 +- .../clickhousemetricsexporter/clickhouse.go | 123 ++++++++++++++++-- .../clickhousemetricsexporter/exporter.go | 108 +++++++-------- exporter/clickhousemetricsexporter/helper.go | 4 +- .../signozspanmetricsprocessor/processor.go | 14 +- 5 files changed, 178 insertions(+), 74 deletions(-) diff --git a/exporter/clickhousemetricsexporter/base/base.go b/exporter/clickhousemetricsexporter/base/base.go index 0a3cd2f1..693dd44a 100644 --- a/exporter/clickhousemetricsexporter/base/base.go +++ b/exporter/clickhousemetricsexporter/base/base.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/prometheus/prometheus/prompb" ) @@ -34,7 +35,7 @@ type Storage interface { // Read(context.Context, []Query) (*prompb.ReadResponse, error) // Write puts data into storage. - Write(context.Context, *prompb.WriteRequest) error + Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error // Returns the DB conn. GetDBConn() interface{} diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 535e20a4..6a1de323 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -31,6 +31,8 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.13.0" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries" @@ -39,14 +41,18 @@ import ( ) const ( - namespace = "promhouse" - subsystem = "clickhouse" - nameLabel = "__name__" - CLUSTER = "cluster" - DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2" - DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" - TIME_SERIES_TABLE = "time_series_v2" - SAMPLES_TABLE = "samples_v2" + namespace = "promhouse" + subsystem = "clickhouse" + nameLabel = "__name__" + CLUSTER = "cluster" + DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2" + DISTRIBUTED_TIME_SERIES_TABLE_V3 = "distributed_time_series_v3" + DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" + TIME_SERIES_TABLE = "time_series_v2" + TIME_SERIES_TABLE_V3 = "time_series_v3" + SAMPLES_TABLE = "samples_v2" + temporalityLabel = "__temporality__" + envLabel = "env" ) // clickHouse implements storage interface for the ClickHouse. @@ -138,6 +144,35 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { queries = append(queries, fmt.Sprintf(` ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER)) + // Add temporality column to time_series table + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER)) + + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) + + // Add set index on temporality column + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER)) + + // Create a new table + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s ( + env LowCardinality(String) DEFAULT 'default', + temporality LowCardinality(String) DEFAULT 'Unspecified', + metric_name LowCardinality(String), + fingerprint UInt64 CODEC(Delta, ZSTD), + timestamp_ms Int64 CODEC(Delta, ZSTD), + labels String CODEC(ZSTD(5)) + ) + ENGINE = ReplacingMergeTree + PARTITION BY toDate(timestamp_ms / 1000) + ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER)) + + // Create a new distributed table + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3)) + options := &clickhouse.Options{ Addr: []string{dsnURL.Host}, } @@ -253,14 +288,15 @@ func (ch *clickHouse) GetDBConn() interface{} { return ch.conn } -func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) error { +func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error { // calculate fingerprints, map them to time series fingerprints := make([]uint64, len(data.Timeseries)) timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries)) - fingerprintToName := make(map[uint64]string) + fingerprintToName := make(map[uint64]map[string]string) for i, ts := range data.Timeseries { var metricName string + var env string = "default" labelsOverridden := make(map[string]*prompb.Label) for _, label := range ts.Labels { labelsOverridden[label.Name] = &prompb.Label{ @@ -270,16 +306,32 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro if label.Name == nameLabel { metricName = label.Value } + if label.Name == semconv.AttributeDeploymentEnvironment || label.Name == sanitize(semconv.AttributeDeploymentEnvironment) { + env = label.Value + } } var labels []*prompb.Label for _, l := range labelsOverridden { labels = append(labels, l) } + // add temporality label + if metricName != "" { + if t, ok := metricNameToTemporality[metricName]; ok { + labels = append(labels, &prompb.Label{ + Name: temporalityLabel, + Value: t.String(), + }) + } + } timeseries.SortLabels(labels) f := timeseries.Fingerprint(labels) fingerprints[i] = f timeSeries[f] = labels - fingerprintToName[f] = metricName + if _, ok := fingerprintToName[f]; !ok { + fingerprintToName[f] = make(map[string]string) + } + fingerprintToName[f][nameLabel] = metricName + fingerprintToName[f][env] = env } if len(fingerprints) != len(timeSeries) { ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries)) @@ -304,7 +356,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro return err } - statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE)) + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE)) if err != nil { return err } @@ -312,7 +364,8 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro for fingerprint, labels := range newTimeSeries { encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) err = statement.Append( - fingerprintToName[fingerprint], + fingerprintToName[fingerprint][nameLabel], + metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(), timestamp, fingerprint, encodedLabels, @@ -336,6 +389,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro return err } + // Write to time_series_v3 table + err = func() error { + ctx := context.Background() + err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`) + if err != nil { + return err + } + + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3)) + if err != nil { + return err + } + timestamp := model.Now().Time().UnixMilli() + for fingerprint, labels := range newTimeSeries { + encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) + err = statement.Append( + fingerprintToName[fingerprint][envLabel], + metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(), + fingerprintToName[fingerprint][nameLabel], + fingerprint, + timestamp, + encodedLabels, + ) + if err != nil { + return err + } + } + + start := time.Now() + err = statement.Send() + ctx, _ = tag.New(ctx, + tag.Upsert(exporterKey, string(component.DataTypeMetrics)), + tag.Upsert(tableKey, TIME_SERIES_TABLE_V3), + ) + stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) + return err + }() + + if err != nil { + return err + } + metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -367,7 +462,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro } err = statement.Append( - fingerprintToName[fingerprint], + fingerprintToName[fingerprint][nameLabel], fingerprint, s.Timestamp, s.Value, diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index 418c76ae..45039c0f 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -29,7 +29,6 @@ import ( "github.com/pkg/errors" "go.opencensus.io/stats/view" "go.uber.org/multierr" - "go.uber.org/zap" "github.com/prometheus/prometheus/prompb" @@ -47,18 +46,20 @@ const maxBatchByteSize = 3000000 // PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type PrwExporter struct { - namespace string - externalLabels map[string]string - endpointURL *url.URL - client *http.Client - wg *sync.WaitGroup - closeChan chan struct{} - concurrency int - userAgentHeader string - clientSettings *confighttp.HTTPClientSettings - settings component.TelemetrySettings - ch base.Storage - usageCollector *usage.UsageCollector + namespace string + externalLabels map[string]string + endpointURL *url.URL + client *http.Client + wg *sync.WaitGroup + closeChan chan struct{} + concurrency int + userAgentHeader string + clientSettings *confighttp.HTTPClientSettings + settings component.TelemetrySettings + ch base.Storage + usageCollector *usage.UsageCollector + metricNameToTemporality map[string]pmetric.AggregationTemporality + mux *sync.Mutex } // NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly. @@ -107,17 +108,19 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err } return &PrwExporter{ - namespace: cfg.Namespace, - externalLabels: sanitizedLabels, - endpointURL: endpointURL, - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), - userAgentHeader: userAgentHeader, - concurrency: cfg.RemoteWriteQueue.NumConsumers, - clientSettings: &cfg.HTTPClientSettings, - settings: set.TelemetrySettings, - ch: ch, - usageCollector: collector, + namespace: cfg.Namespace, + externalLabels: sanitizedLabels, + endpointURL: endpointURL, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + userAgentHeader: userAgentHeader, + concurrency: cfg.RemoteWriteQueue.NumConsumers, + clientSettings: &cfg.HTTPClientSettings, + settings: set.TelemetrySettings, + ch: ch, + usageCollector: collector, + metricNameToTemporality: make(map[string]pmetric.AggregationTemporality), + mux: new(sync.Mutex), }, nil } @@ -167,37 +170,31 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er // TODO: decide if scope information should be exported as labels for k := 0; k < metricSlice.Len(); k++ { metric := metricSlice.At(k) + var temporality pmetric.AggregationTemporality - // check for valid type and temporality combination and for matching data field and type - if ok := validateMetrics(metric); !ok { - dropped++ - errs = multierr.Append(errs, consumererror.NewPermanent(errors.New("invalid temporality and type combination"))) - serviceName, found := resource.Attributes().Get("service.name") - if !found { - serviceName = pcommon.NewValueStr("") - } - metricType := metric.Type() - var numDataPoints int - var temporality pmetric.AggregationTemporality - switch metricType { - case pmetric.MetricTypeGauge: - numDataPoints = metric.Gauge().DataPoints().Len() - case pmetric.MetricTypeSum: - numDataPoints = metric.Sum().DataPoints().Len() - temporality = metric.Sum().AggregationTemporality() - case pmetric.MetricTypeHistogram: - numDataPoints = metric.Histogram().DataPoints().Len() - temporality = metric.Histogram().AggregationTemporality() - case pmetric.MetricTypeSummary: - numDataPoints = metric.Summary().DataPoints().Len() - default: - } - prwe.settings.Logger.Error("unsupported metric type and temporality combination", zap.Int("numDataPoints", numDataPoints), zap.Any("metricType", metricType), zap.Any("temporality", temporality), zap.String("service name", serviceName.AsString())) - continue + metricType := metric.Type() + + switch metricType { + case pmetric.MetricTypeGauge: + temporality = pmetric.AggregationTemporalityUnspecified + case pmetric.MetricTypeSum: + temporality = metric.Sum().AggregationTemporality() + case pmetric.MetricTypeHistogram: + temporality = metric.Histogram().AggregationTemporality() + case pmetric.MetricTypeSummary: + temporality = pmetric.AggregationTemporalityUnspecified + default: + } + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality + + if metricType == pmetric.MetricTypeHistogram || metricType == pmetric.MetricTypeSummary { + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+bucketStr] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+countStr] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+sumStr] = temporality } // handle individual metric based on type - switch metric.Type() { + switch metricType { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() if err := prwe.addNumberDataPointSlice(dataPoints, tsMap, resource, metric); err != nil { @@ -280,6 +277,13 @@ func (prwe *PrwExporter) addNumberDataPointSlice(dataPoints pmetric.NumberDataPo // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { + prwe.mux.Lock() + // make a copy of metricNameToTemporality + metricNameToTemporality := make(map[string]pmetric.AggregationTemporality) + for k, v := range prwe.metricNameToTemporality { + metricNameToTemporality[k] = v + } + prwe.mux.Unlock() var errs []error // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, maxBatchByteSize) @@ -307,7 +311,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti defer wg.Done() for request := range input { - err := prwe.ch.Write(ctx, request) + err := prwe.ch.Write(ctx, request, metricNameToTemporality) if err != nil { mu.Lock() errs = append(errs, err) diff --git a/exporter/clickhousemetricsexporter/helper.go b/exporter/clickhousemetricsexporter/helper.go index fe5bdbf7..8b9c1649 100644 --- a/exporter/clickhousemetricsexporter/helper.go +++ b/exporter/clickhousemetricsexporter/helper.go @@ -69,9 +69,9 @@ func validateMetrics(metric pmetric.Metric) bool { case pmetric.MetricTypeGauge: return metric.Gauge().DataPoints().Len() != 0 case pmetric.MetricTypeSum: - return metric.Sum().DataPoints().Len() != 0 && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Sum().DataPoints().Len() != 0 case pmetric.MetricTypeHistogram: - return metric.Histogram().DataPoints().Len() != 0 && metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Histogram().DataPoints().Len() != 0 case pmetric.MetricTypeSummary: return metric.Summary().DataPoints().Len() != 0 } diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index 33fa30eb..c6ba2dc2 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -302,7 +302,7 @@ func (p *processorImp) shouldSkip(serviceName string, span ptrace.Span, resource // Start implements the component.Component interface. func (p *processorImp) Start(ctx context.Context, host component.Host) error { - p.logger.Info("Starting signozspanmetricsprocessor") + p.logger.Info("Starting signozspanmetricsprocessor with config", zap.Any("config", p.config)) exporters := host.GetExporters() var availableMetricsExporters []string @@ -461,12 +461,14 @@ func (p *processorImp) collectDBCallMetrics(ilm pmetric.ScopeMetrics) error { mDBCallSum := ilm.Metrics().AppendEmpty() mDBCallSum.SetName("signoz_db_latency_sum") mDBCallSum.SetUnit("1") - mDBCallSum.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mDBCallSum.SetEmptySum().SetIsMonotonic(true) + mDBCallSum.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) mDBCallCount := ilm.Metrics().AppendEmpty() mDBCallCount.SetName("signoz_db_latency_count") mDBCallCount.SetUnit("1") - mDBCallCount.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mDBCallCount.SetEmptySum().SetIsMonotonic(true) + mDBCallCount.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) callSumDps := mDBCallSum.Sum().DataPoints() callCountDps := mDBCallCount.Sum().DataPoints() @@ -501,12 +503,14 @@ func (p *processorImp) collectExternalCallMetrics(ilm pmetric.ScopeMetrics) erro mExternalCallSum := ilm.Metrics().AppendEmpty() mExternalCallSum.SetName("signoz_external_call_latency_sum") mExternalCallSum.SetUnit("1") - mExternalCallSum.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mExternalCallSum.SetEmptySum().SetIsMonotonic(true) + mExternalCallSum.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) mExternalCallCount := ilm.Metrics().AppendEmpty() mExternalCallCount.SetName("signoz_external_call_latency_count") mExternalCallCount.SetUnit("1") - mExternalCallCount.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mExternalCallCount.SetEmptySum().SetIsMonotonic(true) + mExternalCallCount.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) callSumDps := mExternalCallSum.Sum().DataPoints() callCountDps := mExternalCallCount.Sum().DataPoints()