From 6f34c4457ca7878b62929a06d1455997983dfa80 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 12 Aug 2024 13:50:34 +0530 Subject: [PATCH] Move usage to new table write block (#360) --- .../clickhousemetricsexporter/clickhouse.go | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 72ba5e06..21407e2d 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -305,7 +305,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } - metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -318,23 +317,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr fingerprint := fingerprints[i] for _, s := range ts.Samples { - // usage collection checks - tenant := "default" - collectUsage := true - for _, val := range timeSeries[fingerprint] { - if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) { - collectUsage = false - break - } - if val.Name == "tenant" { - tenant = val.Value - } - } - - if collectUsage { - usage.AddMetric(metrics, tenant, 1, int64(len(s.String()))) - } - err = statement.Append( fingerprintToName[fingerprint][nameLabel], fingerprint, @@ -359,12 +341,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } - for k, v := range metrics { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size))) - } - // write to distributed_samples_v4 table if ch.writeTSToV4 { + metrics := map[string]usage.Metric{} err = func() error { statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, value) VALUES (?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_SAMPLES_TABLE_V4), driver.WithReleaseConnection()) if err != nil { @@ -386,6 +365,23 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } + + // usage collection checks + tenant := "default" + collectUsage := true + for _, val := range timeSeries[fingerprint] { + if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) { + collectUsage = false + break + } + if val.Name == "tenant" { + tenant = val.Value + } + } + + if collectUsage { + usage.AddMetric(metrics, tenant, 1, int64(len(s.String()))) + } } } @@ -402,6 +398,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } + for k, v := range metrics { + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size))) + } } // write to distributed_time_series_v4 table