From 11d1d77262d7c03e5bd82ff261f02578bc98536e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 18 Mar 2023 04:05:33 +0530 Subject: [PATCH 1/4] delta temporality --- Makefile | 10 ++ .../clickhousemetricsexporter/base/base.go | 3 +- .../clickhousemetricsexporter/clickhouse.go | 13 ++- .../clickhousemetricsexporter/exporter.go | 102 +++++++++--------- exporter/clickhousemetricsexporter/helper.go | 4 +- 5 files changed, 75 insertions(+), 57 deletions(-) diff --git a/Makefile b/Makefile index df21a665..1b58d101 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,4 @@ +BRANCH := $(shell git rev-parse --abbrev-ref HEAD) COMMIT_SHA ?= $(shell git rev-parse HEAD) REPONAME ?= signoz IMAGE_NAME ?= "signoz-otel-collector" @@ -60,6 +61,15 @@ build-signoz-collector: --no-cache -f cmd/signozcollector/Dockerfile --progress plane \ --tag $(REPONAME)/$(IMAGE_NAME):$(DOCKER_TAG) . +.PHONY: build-signoz-collector-local +build-signoz-collector-local: + @echo "------------------" + @echo "--> Build signoz collector docker image" + @echo "------------------" + docker build --build-arg TARGETPLATFORM="linux/amd64" \ + --no-cache -f cmd/signozcollector/Dockerfile \ + --tag $(REPONAME)/$(IMAGE_NAME):$(BRANCH) . + .PHONY: lint lint: @echo "Running linters..." diff --git a/exporter/clickhousemetricsexporter/base/base.go b/exporter/clickhousemetricsexporter/base/base.go index 0a3cd2f1..693dd44a 100644 --- a/exporter/clickhousemetricsexporter/base/base.go +++ b/exporter/clickhousemetricsexporter/base/base.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/prometheus/prometheus/prompb" ) @@ -34,7 +35,7 @@ type Storage interface { // Read(context.Context, []Query) (*prompb.ReadResponse, error) // Write puts data into storage. - Write(context.Context, *prompb.WriteRequest) error + Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error // Returns the DB conn. GetDBConn() interface{} diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index f2cc9097..3a962b83 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -31,6 +31,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries" @@ -135,6 +136,13 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { queries = append(queries, fmt.Sprintf(` ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER)) + // Add temporality column to time_series table + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality String DEFAULT 'Cumulative'`, database, TIME_SERIES_TABLE, CLUSTER)) + + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality String DEFAULT 'Cumulative'`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) + options := &clickhouse.Options{ Addr: []string{dsnURL.Host}, } @@ -253,7 +261,7 @@ func (ch *clickHouse) GetDBConn() interface{} { return ch.conn } -func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) error { +func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error { // calculate fingerprints, map them to time series fingerprints := make([]uint64, len(data.Timeseries)) timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries)) @@ -304,7 +312,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro return err } - statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE)) + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE)) if err != nil { return err } @@ -313,6 +321,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) err = statement.Append( fingerprintToName[fingerprint], + metricNameToTemporality[fingerprintToName[fingerprint]].String(), timestamp, fingerprint, encodedLabels, diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index c78b12d9..959da754 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -29,7 +29,6 @@ import ( "github.com/pkg/errors" "go.opencensus.io/stats/view" "go.uber.org/multierr" - "go.uber.org/zap" "github.com/prometheus/prometheus/prompb" @@ -46,18 +45,20 @@ const maxBatchByteSize = 3000000 // PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type PrwExporter struct { - namespace string - externalLabels map[string]string - endpointURL *url.URL - client *http.Client - wg *sync.WaitGroup - closeChan chan struct{} - concurrency int - userAgentHeader string - clientSettings *confighttp.HTTPClientSettings - settings component.TelemetrySettings - ch base.Storage - usageCollector *usage.UsageCollector + namespace string + externalLabels map[string]string + endpointURL *url.URL + client *http.Client + wg *sync.WaitGroup + closeChan chan struct{} + concurrency int + userAgentHeader string + clientSettings *confighttp.HTTPClientSettings + settings component.TelemetrySettings + ch base.Storage + usageCollector *usage.UsageCollector + metricNameToTemporality map[string]pmetric.AggregationTemporality + mux sync.Mutex } // NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly. @@ -105,17 +106,19 @@ func NewPrwExporter(cfg *Config, set component.ExporterCreateSettings) (*PrwExpo } return &PrwExporter{ - namespace: cfg.Namespace, - externalLabels: sanitizedLabels, - endpointURL: endpointURL, - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), - userAgentHeader: userAgentHeader, - concurrency: cfg.RemoteWriteQueue.NumConsumers, - clientSettings: &cfg.HTTPClientSettings, - settings: set.TelemetrySettings, - ch: ch, - usageCollector: collector, + namespace: cfg.Namespace, + externalLabels: sanitizedLabels, + endpointURL: endpointURL, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + userAgentHeader: userAgentHeader, + concurrency: cfg.RemoteWriteQueue.NumConsumers, + clientSettings: &cfg.HTTPClientSettings, + settings: set.TelemetrySettings, + ch: ch, + usageCollector: collector, + metricNameToTemporality: make(map[string]pmetric.AggregationTemporality), + mux: sync.Mutex{}, }, nil } @@ -165,37 +168,25 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er // TODO: decide if scope information should be exported as labels for k := 0; k < metricSlice.Len(); k++ { metric := metricSlice.At(k) + var temporality pmetric.AggregationTemporality - // check for valid type and temporality combination and for matching data field and type - if ok := validateMetrics(metric); !ok { - dropped++ - errs = multierr.Append(errs, consumererror.NewPermanent(errors.New("invalid temporality and type combination"))) - serviceName, found := resource.Attributes().Get("service.name") - if !found { - serviceName = pcommon.NewValueStr("") - } - metricType := metric.Type() - var numDataPoints int - var temporality pmetric.AggregationTemporality - switch metricType { - case pmetric.MetricTypeGauge: - numDataPoints = metric.Gauge().DataPoints().Len() - case pmetric.MetricTypeSum: - numDataPoints = metric.Sum().DataPoints().Len() - temporality = metric.Sum().AggregationTemporality() - case pmetric.MetricTypeHistogram: - numDataPoints = metric.Histogram().DataPoints().Len() - temporality = metric.Histogram().AggregationTemporality() - case pmetric.MetricTypeSummary: - numDataPoints = metric.Summary().DataPoints().Len() - default: - } - zap.S().Errorf("dropped %d number of metric data points of type %d with temporality %d for a service %s", numDataPoints, metricType, temporality, serviceName.AsString()) - continue + metricType := metric.Type() + + switch metricType { + case pmetric.MetricTypeGauge: + temporality = pmetric.AggregationTemporalityUnspecified + case pmetric.MetricTypeSum: + temporality = metric.Sum().AggregationTemporality() + case pmetric.MetricTypeHistogram: + temporality = metric.Histogram().AggregationTemporality() + case pmetric.MetricTypeSummary: + temporality = pmetric.AggregationTemporalityUnspecified + default: } + prwe.metricNameToTemporality[metric.Name()] = temporality // handle individual metric based on type - switch metric.Type() { + switch metricType { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() if err := prwe.addNumberDataPointSlice(dataPoints, tsMap, resource, metric); err != nil { @@ -278,6 +269,13 @@ func (prwe *PrwExporter) addNumberDataPointSlice(dataPoints pmetric.NumberDataPo // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { + prwe.mux.Lock() + // make a copy of metricNameToTemporality + metricNameToTemporality := make(map[string]pmetric.AggregationTemporality) + for k, v := range prwe.metricNameToTemporality { + metricNameToTemporality[k] = v + } + prwe.mux.Unlock() var errs []error // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, maxBatchByteSize) @@ -305,7 +303,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti defer wg.Done() for request := range input { - err := prwe.ch.Write(ctx, request) + err := prwe.ch.Write(ctx, request, metricNameToTemporality) if err != nil { mu.Lock() errs = append(errs, err) diff --git a/exporter/clickhousemetricsexporter/helper.go b/exporter/clickhousemetricsexporter/helper.go index fe5bdbf7..8b9c1649 100644 --- a/exporter/clickhousemetricsexporter/helper.go +++ b/exporter/clickhousemetricsexporter/helper.go @@ -69,9 +69,9 @@ func validateMetrics(metric pmetric.Metric) bool { case pmetric.MetricTypeGauge: return metric.Gauge().DataPoints().Len() != 0 case pmetric.MetricTypeSum: - return metric.Sum().DataPoints().Len() != 0 && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Sum().DataPoints().Len() != 0 case pmetric.MetricTypeHistogram: - return metric.Histogram().DataPoints().Len() != 0 && metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Histogram().DataPoints().Len() != 0 case pmetric.MetricTypeSummary: return metric.Summary().DataPoints().Len() != 0 } From 246975e0edcb85279a1d32a6ccc9fb101e297afc Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 18 Mar 2023 20:06:40 +0530 Subject: [PATCH 2/4] correct metrics name and fingerprint --- exporter/clickhousemetricsexporter/clickhouse.go | 10 ++++++++++ exporter/clickhousemetricsexporter/exporter.go | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 3a962b83..02074d03 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -48,6 +48,7 @@ const ( DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" TIME_SERIES_TABLE = "time_series_v2" SAMPLES_TABLE = "samples_v2" + temporalityLabel = "__temporality__" ) // clickHouse implements storage interface for the ClickHouse. @@ -283,6 +284,15 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr for _, l := range labelsOverridden { labels = append(labels, l) } + // add temporality label + if metricName != "" { + if t, ok := metricNameToTemporality[metricName]; ok { + labels = append(labels, &prompb.Label{ + Name: temporalityLabel, + Value: t.String(), + }) + } + } timeseries.SortLabels(labels) f := timeseries.Fingerprint(labels) fingerprints[i] = f diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index 959da754..bca809d4 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -183,7 +183,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er temporality = pmetric.AggregationTemporalityUnspecified default: } - prwe.metricNameToTemporality[metric.Name()] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality // handle individual metric based on type switch metricType { From 8e1bed296d60ade7297fa8b74ded2a51356f556b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 25 Mar 2023 22:23:49 +0530 Subject: [PATCH 3/4] Update column type --- Makefile | 10 ---------- exporter/clickhousemetricsexporter/clickhouse.go | 4 ++-- exporter/clickhousemetricsexporter/exporter.go | 4 ++-- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/Makefile b/Makefile index 1b58d101..df21a665 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,3 @@ -BRANCH := $(shell git rev-parse --abbrev-ref HEAD) COMMIT_SHA ?= $(shell git rev-parse HEAD) REPONAME ?= signoz IMAGE_NAME ?= "signoz-otel-collector" @@ -61,15 +60,6 @@ build-signoz-collector: --no-cache -f cmd/signozcollector/Dockerfile --progress plane \ --tag $(REPONAME)/$(IMAGE_NAME):$(DOCKER_TAG) . -.PHONY: build-signoz-collector-local -build-signoz-collector-local: - @echo "------------------" - @echo "--> Build signoz collector docker image" - @echo "------------------" - docker build --build-arg TARGETPLATFORM="linux/amd64" \ - --no-cache -f cmd/signozcollector/Dockerfile \ - --tag $(REPONAME)/$(IMAGE_NAME):$(BRANCH) . - .PHONY: lint lint: @echo "Running linters..." diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 02074d03..7f313e70 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -139,10 +139,10 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { // Add temporality column to time_series table queries = append(queries, fmt.Sprintf(` - ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality String DEFAULT 'Cumulative'`, database, TIME_SERIES_TABLE, CLUSTER)) + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality Enum('Unspecified' = 0, 'Delta', 'Cumulative')`, database, TIME_SERIES_TABLE, CLUSTER)) queries = append(queries, fmt.Sprintf(` - ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality String DEFAULT 'Cumulative'`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality Enum('Unspecified' = 0, 'Delta', 'Cumulative')`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) options := &clickhouse.Options{ Addr: []string{dsnURL.Host}, diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index bca809d4..362bfed7 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -58,7 +58,7 @@ type PrwExporter struct { ch base.Storage usageCollector *usage.UsageCollector metricNameToTemporality map[string]pmetric.AggregationTemporality - mux sync.Mutex + mux *sync.Mutex } // NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly. @@ -118,7 +118,7 @@ func NewPrwExporter(cfg *Config, set component.ExporterCreateSettings) (*PrwExpo ch: ch, usageCollector: collector, metricNameToTemporality: make(map[string]pmetric.AggregationTemporality), - mux: sync.Mutex{}, + mux: new(sync.Mutex), }, nil } From 925fe29d9f000b21512a7b15573b2ccd76cad803 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 10 Jul 2023 18:53:26 +0530 Subject: [PATCH 4/4] Fix monotonic, write to new table, bunch of other things --- .../clickhousemetricsexporter/clickhouse.go | 108 +++++++++++++++--- .../clickhousemetricsexporter/exporter.go | 6 + .../signozspanmetricsprocessor/processor.go | 14 ++- 3 files changed, 107 insertions(+), 21 deletions(-) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index c60d1963..6a1de323 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -32,6 +32,7 @@ import ( "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.13.0" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries" @@ -40,15 +41,18 @@ import ( ) const ( - namespace = "promhouse" - subsystem = "clickhouse" - nameLabel = "__name__" - CLUSTER = "cluster" - DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2" - DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" - TIME_SERIES_TABLE = "time_series_v2" - SAMPLES_TABLE = "samples_v2" - temporalityLabel = "__temporality__" + namespace = "promhouse" + subsystem = "clickhouse" + nameLabel = "__name__" + CLUSTER = "cluster" + DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2" + DISTRIBUTED_TIME_SERIES_TABLE_V3 = "distributed_time_series_v3" + DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2" + TIME_SERIES_TABLE = "time_series_v2" + TIME_SERIES_TABLE_V3 = "time_series_v3" + SAMPLES_TABLE = "samples_v2" + temporalityLabel = "__temporality__" + envLabel = "env" ) // clickHouse implements storage interface for the ClickHouse. @@ -142,10 +146,32 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { // Add temporality column to time_series table queries = append(queries, fmt.Sprintf(` - ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality Enum('Unspecified' = 0, 'Delta', 'Cumulative')`, database, TIME_SERIES_TABLE, CLUSTER)) + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER)) queries = append(queries, fmt.Sprintf(` - ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality Enum('Unspecified' = 0, 'Delta', 'Cumulative')`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) + ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER)) + + // Add set index on temporality column + queries = append(queries, fmt.Sprintf(` + ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER)) + + // Create a new table + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s ( + env LowCardinality(String) DEFAULT 'default', + temporality LowCardinality(String) DEFAULT 'Unspecified', + metric_name LowCardinality(String), + fingerprint UInt64 CODEC(Delta, ZSTD), + timestamp_ms Int64 CODEC(Delta, ZSTD), + labels String CODEC(ZSTD(5)) + ) + ENGINE = ReplacingMergeTree + PARTITION BY toDate(timestamp_ms / 1000) + ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER)) + + // Create a new distributed table + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3)) options := &clickhouse.Options{ Addr: []string{dsnURL.Host}, @@ -266,10 +292,11 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr // calculate fingerprints, map them to time series fingerprints := make([]uint64, len(data.Timeseries)) timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries)) - fingerprintToName := make(map[uint64]string) + fingerprintToName := make(map[uint64]map[string]string) for i, ts := range data.Timeseries { var metricName string + var env string = "default" labelsOverridden := make(map[string]*prompb.Label) for _, label := range ts.Labels { labelsOverridden[label.Name] = &prompb.Label{ @@ -279,6 +306,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if label.Name == nameLabel { metricName = label.Value } + if label.Name == semconv.AttributeDeploymentEnvironment || label.Name == sanitize(semconv.AttributeDeploymentEnvironment) { + env = label.Value + } } var labels []*prompb.Label for _, l := range labelsOverridden { @@ -297,7 +327,11 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr f := timeseries.Fingerprint(labels) fingerprints[i] = f timeSeries[f] = labels - fingerprintToName[f] = metricName + if _, ok := fingerprintToName[f]; !ok { + fingerprintToName[f] = make(map[string]string) + } + fingerprintToName[f][nameLabel] = metricName + fingerprintToName[f][env] = env } if len(fingerprints) != len(timeSeries) { ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries)) @@ -330,8 +364,8 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr for fingerprint, labels := range newTimeSeries { encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) err = statement.Append( - fingerprintToName[fingerprint], - metricNameToTemporality[fingerprintToName[fingerprint]].String(), + fingerprintToName[fingerprint][nameLabel], + metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(), timestamp, fingerprint, encodedLabels, @@ -355,6 +389,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } + // Write to time_series_v3 table + err = func() error { + ctx := context.Background() + err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`) + if err != nil { + return err + } + + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3)) + if err != nil { + return err + } + timestamp := model.Now().Time().UnixMilli() + for fingerprint, labels := range newTimeSeries { + encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) + err = statement.Append( + fingerprintToName[fingerprint][envLabel], + metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(), + fingerprintToName[fingerprint][nameLabel], + fingerprint, + timestamp, + encodedLabels, + ) + if err != nil { + return err + } + } + + start := time.Now() + err = statement.Send() + ctx, _ = tag.New(ctx, + tag.Upsert(exporterKey, string(component.DataTypeMetrics)), + tag.Upsert(tableKey, TIME_SERIES_TABLE_V3), + ) + stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) + return err + }() + + if err != nil { + return err + } + metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -386,7 +462,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr } err = statement.Append( - fingerprintToName[fingerprint], + fingerprintToName[fingerprint][nameLabel], fingerprint, s.Timestamp, s.Value, diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index c579e55f..45039c0f 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -187,6 +187,12 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er } prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality + if metricType == pmetric.MetricTypeHistogram || metricType == pmetric.MetricTypeSummary { + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+bucketStr] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+countStr] = temporality + prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+sumStr] = temporality + } + // handle individual metric based on type switch metricType { case pmetric.MetricTypeGauge: diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index 33fa30eb..c6ba2dc2 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -302,7 +302,7 @@ func (p *processorImp) shouldSkip(serviceName string, span ptrace.Span, resource // Start implements the component.Component interface. func (p *processorImp) Start(ctx context.Context, host component.Host) error { - p.logger.Info("Starting signozspanmetricsprocessor") + p.logger.Info("Starting signozspanmetricsprocessor with config", zap.Any("config", p.config)) exporters := host.GetExporters() var availableMetricsExporters []string @@ -461,12 +461,14 @@ func (p *processorImp) collectDBCallMetrics(ilm pmetric.ScopeMetrics) error { mDBCallSum := ilm.Metrics().AppendEmpty() mDBCallSum.SetName("signoz_db_latency_sum") mDBCallSum.SetUnit("1") - mDBCallSum.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mDBCallSum.SetEmptySum().SetIsMonotonic(true) + mDBCallSum.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) mDBCallCount := ilm.Metrics().AppendEmpty() mDBCallCount.SetName("signoz_db_latency_count") mDBCallCount.SetUnit("1") - mDBCallCount.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mDBCallCount.SetEmptySum().SetIsMonotonic(true) + mDBCallCount.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) callSumDps := mDBCallSum.Sum().DataPoints() callCountDps := mDBCallCount.Sum().DataPoints() @@ -501,12 +503,14 @@ func (p *processorImp) collectExternalCallMetrics(ilm pmetric.ScopeMetrics) erro mExternalCallSum := ilm.Metrics().AppendEmpty() mExternalCallSum.SetName("signoz_external_call_latency_sum") mExternalCallSum.SetUnit("1") - mExternalCallSum.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mExternalCallSum.SetEmptySum().SetIsMonotonic(true) + mExternalCallSum.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) mExternalCallCount := ilm.Metrics().AppendEmpty() mExternalCallCount.SetName("signoz_external_call_latency_count") mExternalCallCount.SetUnit("1") - mExternalCallCount.SetEmptySum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + mExternalCallCount.SetEmptySum().SetIsMonotonic(true) + mExternalCallCount.Sum().SetAggregationTemporality(p.config.GetAggregationTemporality()) callSumDps := mExternalCallSum.Sum().DataPoints() callCountDps := mExternalCallCount.Sum().DataPoints()