From 140c30d2c73e5925c6460107382d85c90b0edda6 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 8 Aug 2024 19:28:46 +0530 Subject: [PATCH 1/3] =?UTF-8?q?Skip=20write=20to=20time=5Fseries=5Fv4=20ta?= =?UTF-8?q?ble=20for=20the=20same=20fingerprint=20in=20curren=E2=80=A6=20(?= =?UTF-8?q?#355)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clickhousemetricsexporter/clickhouse.go | 57 +++++-------------- go.mod | 1 + go.sum | 2 + 3 files changed, 18 insertions(+), 42 deletions(-) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index dfcbe889..72ba5e06 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -28,6 +28,7 @@ import ( clickhouse "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/google/uuid" + "github.com/jellydator/ttlcache/v3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/sirupsen/logrus" @@ -64,6 +65,8 @@ type clickHouse struct { database string maxTimeSeriesInQuery int + cache *ttlcache.Cache[string, bool] + timeSeriesRW sync.RWMutex // Maintains the lookup map for fingerprints that are // written to time series table. This map is used to eliminate the @@ -113,11 +116,18 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { return nil, fmt.Errorf("could not connect to clickhouse: %s", err) } + cache := ttlcache.New[string, bool]( + ttlcache.WithTTL[string, bool](45*time.Minute), + ttlcache.WithDisableTouchOnHit[string, bool](), + ) + go cache.Start() + ch := &clickHouse{ conn: conn, l: l, database: options.Auth.Database, maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery, + cache: cache, timeSeries: make(map[uint64]struct{}, 8192), @@ -295,48 +305,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } - // Write to distributed_time_series_v3 table - err = func() error { - - statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3), driver.WithReleaseConnection()) - if err != nil { - return err - } - timestamp := model.Now().Time().UnixMilli() - for fingerprint, labels := range newTimeSeries { - encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) - meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]] - err = statement.Append( - fingerprintToName[fingerprint][envLabel], - meta.Temporality.String(), - fingerprintToName[fingerprint][nameLabel], - fingerprint, - timestamp, - encodedLabels, - meta.Description, - meta.Unit, - meta.Typ.String(), - meta.IsMonotonic, - ) - if err != nil { - return err - } - } - - start := time.Now() - err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())), - tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3), - ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) - return err - }() - - if err != nil { - return err - } - metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -447,6 +415,10 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr unixMilli := model.Now().Time().UnixMilli() / 3600000 * 3600000 for fingerprint, labels := range timeSeries { + key := fmt.Sprintf("%d:%d", fingerprint, unixMilli) + if ch.cache.Get(key) != nil && ch.cache.Get(key).Value() { + continue + } encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]] err = statement.Append( @@ -464,6 +436,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } + ch.cache.Set(key, true, ttlcache.DefaultTTL) } start := time.Now() diff --git a/go.mod b/go.mod index 47dd7128..e7e4fea2 100644 --- a/go.mod +++ b/go.mod @@ -312,6 +312,7 @@ require ( github.com/imdario/mergo v0.3.16 // indirect github.com/ionos-cloud/sdk-go/v6 v6.1.11 // indirect github.com/jcmturner/goidentity/v6 v6.0.1 // indirect + github.com/jellydator/ttlcache/v3 v3.2.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/karrick/godirwalk v1.17.0 // indirect diff --git a/go.sum b/go.sum index b34ee495..cb0ca824 100644 --- a/go.sum +++ b/go.sum @@ -752,6 +752,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= +github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= From 6f34c4457ca7878b62929a06d1455997983dfa80 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 12 Aug 2024 13:50:34 +0530 Subject: [PATCH 2/3] Move usage to new table write block (#360) --- .../clickhousemetricsexporter/clickhouse.go | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 72ba5e06..21407e2d 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -305,7 +305,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } - metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -318,23 +317,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr fingerprint := fingerprints[i] for _, s := range ts.Samples { - // usage collection checks - tenant := "default" - collectUsage := true - for _, val := range timeSeries[fingerprint] { - if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) { - collectUsage = false - break - } - if val.Name == "tenant" { - tenant = val.Value - } - } - - if collectUsage { - usage.AddMetric(metrics, tenant, 1, int64(len(s.String()))) - } - err = statement.Append( fingerprintToName[fingerprint][nameLabel], fingerprint, @@ -359,12 +341,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } - for k, v := range metrics { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size))) - } - // write to distributed_samples_v4 table if ch.writeTSToV4 { + metrics := map[string]usage.Metric{} err = func() error { statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, value) VALUES (?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_SAMPLES_TABLE_V4), driver.WithReleaseConnection()) if err != nil { @@ -386,6 +365,23 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } + + // usage collection checks + tenant := "default" + collectUsage := true + for _, val := range timeSeries[fingerprint] { + if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) { + collectUsage = false + break + } + if val.Name == "tenant" { + tenant = val.Value + } + } + + if collectUsage { + usage.AddMetric(metrics, tenant, 1, int64(len(s.String()))) + } } } @@ -402,6 +398,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } + for k, v := range metrics { + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size))) + } } // write to distributed_time_series_v4 table From 26a2061975ce7c6d018faf4f917fc982d1a47227 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 13 Aug 2024 11:48:38 +0530 Subject: [PATCH 3/3] Add pre-aggregation tables with 5m and 30m resolution for samples v4 (#361) --- .../000008_samples_agg_tables.down.sql | 9 ++ .../000008_samples_agg_tables.up.sql | 134 ++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.down.sql create mode 100644 migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.up.sql diff --git a/migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.down.sql b/migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.down.sql new file mode 100644 index 00000000..d63c4f35 --- /dev/null +++ b/migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.down.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_5m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_30m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.distributed_samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.distributed_samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.distributed_time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}}; +DROP TABLE IF EXISTS signoz_metrics.time_series_v4_1week_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}; diff --git a/migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.up.sql b/migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.up.sql new file mode 100644 index 00000000..ae86a99a --- /dev/null +++ b/migrationmanager/migrators/metrics/migrations/000008_samples_agg_tables.up.sql @@ -0,0 +1,134 @@ +-- 5m aggregation table +CREATE TABLE IF NOT EXISTS signoz_metrics.samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `env` LowCardinality(String) DEFAULT 'default', + `temporality` LowCardinality(String) DEFAULT 'Unspecified', + `metric_name` LowCardinality(String), + `fingerprint` UInt64 CODEC(ZSTD(1)), + `unix_milli` Int64 CODEC(DoubleDelta, ZSTD(1)), + `last` SimpleAggregateFunction(anyLast, Float64) CODEC(ZSTD(1)), + `min` SimpleAggregateFunction(min, Float64) CODEC(ZSTD(1)), + `max` SimpleAggregateFunction(max, Float64) CODEC(ZSTD(1)), + `sum` SimpleAggregateFunction(sum, Float64) CODEC(ZSTD(1)), + `count` SimpleAggregateFunction(sum, UInt64) CODEC(ZSTD(1)) +) +ENGINE = {{.SIGNOZ_REPLICATED}}AggregatingMergeTree + PARTITION BY toDate(unix_milli / 1000) + ORDER BY (env, temporality, metric_name, fingerprint, unix_milli) + TTL toDateTime(unix_milli/1000) + INTERVAL 2592000 SECOND DELETE + SETTINGS ttl_only_drop_parts = 1; + +-- 30m aggregation table +CREATE TABLE IF NOT EXISTS signoz_metrics.samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}} +( + `env` LowCardinality(String) DEFAULT 'default', + `temporality` LowCardinality(String) DEFAULT 'Unspecified', + `metric_name` LowCardinality(String), + `fingerprint` UInt64 CODEC(ZSTD(1)), + `unix_milli` Int64 CODEC(DoubleDelta, ZSTD(1)), + `last` SimpleAggregateFunction(anyLast, Float64) CODEC(ZSTD(1)), + `min` SimpleAggregateFunction(min, Float64) CODEC(ZSTD(1)), + `max` SimpleAggregateFunction(max, Float64) CODEC(ZSTD(1)), + `sum` SimpleAggregateFunction(sum, Float64) CODEC(ZSTD(1)), + `count` SimpleAggregateFunction(sum, UInt64) CODEC(ZSTD(1)) +) +ENGINE = {{.SIGNOZ_REPLICATED}}AggregatingMergeTree + PARTITION BY toDate(unix_milli / 1000) + ORDER BY (env, temporality, metric_name, fingerprint, unix_milli) + TTL toDateTime(unix_milli/1000) + INTERVAL 2592000 SECOND DELETE + SETTINGS ttl_only_drop_parts = 1; + +-- 5m aggregation materialized view +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_metrics.samples_v4_agg_5m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} +TO signoz_metrics.samples_v4_agg_5m AS +SELECT + env, + temporality, + metric_name, + fingerprint, + intDiv(unix_milli, 300000) * 300000 as unix_milli, + anyLast(value) as last, + min(value) as min, + max(value) as max, + sum(value) as sum, + count(*) as count +FROM signoz_metrics.samples_v4 +GROUP BY + env, + temporality, + metric_name, + fingerprint, + unix_milli; + +-- 30m aggregation materialized view +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_metrics.samples_v4_agg_30m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} +TO signoz_metrics.samples_v4_agg_30m AS +SELECT + env, + temporality, + metric_name, + fingerprint, + intDiv(unix_milli, 1800000) * 1800000 as unix_milli, + anyLast(last) as last, + min(min) as min, + max(max) as max, + sum(sum) as sum, + sum(count) as count +FROM signoz_metrics.samples_v4_agg_5m +GROUP BY + env, + temporality, + metric_name, + fingerprint, + unix_milli; + +-- 5m aggregation distributed table +CREATE TABLE IF NOT EXISTS signoz_metrics.distributed_samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}} +AS signoz_metrics.samples_v4_agg_5m +ENGINE = Distributed('{{.SIGNOZ_CLUSTER}}', 'signoz_metrics', 'samples_v4_agg_5m', cityHash64(env, temporality, metric_name, fingerprint)); + +-- 30m aggregation distributed table +CREATE TABLE IF NOT EXISTS signoz_metrics.distributed_samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}} +AS signoz_metrics.samples_v4_agg_30m +ENGINE = Distributed('{{.SIGNOZ_CLUSTER}}', 'signoz_metrics', 'samples_v4_agg_30m', cityHash64(env, temporality, metric_name, fingerprint)); + +-- time series v4 1 week table +CREATE TABLE IF NOT EXISTS signoz_metrics.time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}} ( + env LowCardinality(String) DEFAULT 'default', + temporality LowCardinality(String) DEFAULT 'Unspecified', + metric_name LowCardinality(String), + description LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)), + unit LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)), + type LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)), + is_monotonic Bool DEFAULT false CODEC(ZSTD(1)), + fingerprint UInt64 CODEC(Delta, ZSTD), + unix_milli Int64 CODEC(Delta, ZSTD), + labels String CODEC(ZSTD(5)), + INDEX idx_labels labels TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1 +) +ENGINE = {{.SIGNOZ_REPLICATED}}ReplacingMergeTree + PARTITION BY toDate(unix_milli / 1000) + ORDER BY (env, temporality, metric_name, fingerprint, unix_milli) + TTL toDateTime(unix_milli/1000) + INTERVAL 2592000 SECOND DELETE + SETTINGS ttl_only_drop_parts = 1; + +-- time series v4 1 week materialized view +CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_metrics.time_series_v4_1week_mv ON CLUSTER {{.SIGNOZ_CLUSTER}} +TO signoz_metrics.time_series_v4_1week +AS SELECT + env, + temporality, + metric_name, + description, + unit, + type, + is_monotonic, + fingerprint, + floor(unix_milli/604800000)*604800000 AS unix_milli, + labels +FROM signoz_metrics.time_series_v4_1day; + +-- time series v4 1 week distributed table +CREATE TABLE IF NOT EXISTS signoz_metrics.distributed_time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}} +AS signoz_metrics.time_series_v4_1week +ENGINE = Distributed("{{.SIGNOZ_CLUSTER}}", signoz_metrics, time_series_v4_1week, cityHash64(env, temporality, metric_name, fingerprint));