Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat/logs_new_schema' into feat/…
Browse files Browse the repository at this point in the history
…logs_new_schema
  • Loading branch information
nityanandagohain committed Aug 21, 2024
2 parents 789288e + b3ae5e1 commit 0fe6a5f
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 64 deletions.
100 changes: 36 additions & 64 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
clickhouse "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -64,6 +65,8 @@ type clickHouse struct {
database string
maxTimeSeriesInQuery int

cache *ttlcache.Cache[string, bool]

timeSeriesRW sync.RWMutex
// Maintains the lookup map for fingerprints that are
// written to time series table. This map is used to eliminate the
Expand Down Expand Up @@ -113,11 +116,18 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
return nil, fmt.Errorf("could not connect to clickhouse: %s", err)
}

cache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[string, bool](),
)
go cache.Start()

ch := &clickHouse{
conn: conn,
l: l,
database: options.Auth.Database,
maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery,
cache: cache,

timeSeries: make(map[uint64]struct{}, 8192),

Expand Down Expand Up @@ -295,49 +305,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

// Write to distributed_time_series_v3 table
err = func() error {

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3), driver.WithReleaseConnection())
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
meta.Temporality.String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
meta.Description,
meta.Unit,
meta.Typ.String(),
meta.IsMonotonic,
)
if err != nil {
return err
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

if err != nil {
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()

Expand All @@ -350,23 +317,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
fingerprint := fingerprints[i]
for _, s := range ts.Samples {

// usage collection checks
tenant := "default"
collectUsage := true
for _, val := range timeSeries[fingerprint] {
if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) {
collectUsage = false
break
}
if val.Name == "tenant" {
tenant = val.Value
}
}

if collectUsage {
usage.AddMetric(metrics, tenant, 1, int64(len(s.String())))
}

err = statement.Append(
fingerprintToName[fingerprint][nameLabel],
fingerprint,
Expand All @@ -391,12 +341,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

for k, v := range metrics {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size)))
}

// write to distributed_samples_v4 table
if ch.writeTSToV4 {
metrics := map[string]usage.Metric{}
err = func() error {
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, value) VALUES (?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_SAMPLES_TABLE_V4), driver.WithReleaseConnection())
if err != nil {
Expand All @@ -418,6 +365,23 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}

// usage collection checks
tenant := "default"
collectUsage := true
for _, val := range timeSeries[fingerprint] {
if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) {
collectUsage = false
break
}
if val.Name == "tenant" {
tenant = val.Value
}
}

if collectUsage {
usage.AddMetric(metrics, tenant, 1, int64(len(s.String())))
}
}
}

Expand All @@ -434,6 +398,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}
for k, v := range metrics {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size)))
}
}

// write to distributed_time_series_v4 table
Expand All @@ -447,6 +414,10 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
unixMilli := model.Now().Time().UnixMilli() / 3600000 * 3600000

for fingerprint, labels := range timeSeries {
key := fmt.Sprintf("%d:%d", fingerprint, unixMilli)
if ch.cache.Get(key) != nil && ch.cache.Get(key).Value() {
continue
}
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
Expand All @@ -464,6 +435,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}
ch.cache.Set(key, true, ttlcache.DefaultTTL)
}

start := time.Now()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ require (
github.com/imdario/mergo v0.3.16 // indirect
github.com/ionos-cloud/sdk-go/v6 v6.1.11 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jellydator/ttlcache/v3 v3.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_5m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.samples_v4_agg_30m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.distributed_samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.distributed_samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.distributed_time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}};
DROP TABLE IF EXISTS signoz_metrics.time_series_v4_1week_mv ON CLUSTER {{.SIGNOZ_CLUSTER}};
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
-- 5m aggregation table
CREATE TABLE IF NOT EXISTS signoz_metrics.samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}}
(
`env` LowCardinality(String) DEFAULT 'default',
`temporality` LowCardinality(String) DEFAULT 'Unspecified',
`metric_name` LowCardinality(String),
`fingerprint` UInt64 CODEC(ZSTD(1)),
`unix_milli` Int64 CODEC(DoubleDelta, ZSTD(1)),
`last` SimpleAggregateFunction(anyLast, Float64) CODEC(ZSTD(1)),
`min` SimpleAggregateFunction(min, Float64) CODEC(ZSTD(1)),
`max` SimpleAggregateFunction(max, Float64) CODEC(ZSTD(1)),
`sum` SimpleAggregateFunction(sum, Float64) CODEC(ZSTD(1)),
`count` SimpleAggregateFunction(sum, UInt64) CODEC(ZSTD(1))
)
ENGINE = {{.SIGNOZ_REPLICATED}}AggregatingMergeTree
PARTITION BY toDate(unix_milli / 1000)
ORDER BY (env, temporality, metric_name, fingerprint, unix_milli)
TTL toDateTime(unix_milli/1000) + INTERVAL 2592000 SECOND DELETE
SETTINGS ttl_only_drop_parts = 1;

-- 30m aggregation table
CREATE TABLE IF NOT EXISTS signoz_metrics.samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}}
(
`env` LowCardinality(String) DEFAULT 'default',
`temporality` LowCardinality(String) DEFAULT 'Unspecified',
`metric_name` LowCardinality(String),
`fingerprint` UInt64 CODEC(ZSTD(1)),
`unix_milli` Int64 CODEC(DoubleDelta, ZSTD(1)),
`last` SimpleAggregateFunction(anyLast, Float64) CODEC(ZSTD(1)),
`min` SimpleAggregateFunction(min, Float64) CODEC(ZSTD(1)),
`max` SimpleAggregateFunction(max, Float64) CODEC(ZSTD(1)),
`sum` SimpleAggregateFunction(sum, Float64) CODEC(ZSTD(1)),
`count` SimpleAggregateFunction(sum, UInt64) CODEC(ZSTD(1))
)
ENGINE = {{.SIGNOZ_REPLICATED}}AggregatingMergeTree
PARTITION BY toDate(unix_milli / 1000)
ORDER BY (env, temporality, metric_name, fingerprint, unix_milli)
TTL toDateTime(unix_milli/1000) + INTERVAL 2592000 SECOND DELETE
SETTINGS ttl_only_drop_parts = 1;

-- 5m aggregation materialized view
CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_metrics.samples_v4_agg_5m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}
TO signoz_metrics.samples_v4_agg_5m AS
SELECT
env,
temporality,
metric_name,
fingerprint,
intDiv(unix_milli, 300000) * 300000 as unix_milli,
anyLast(value) as last,
min(value) as min,
max(value) as max,
sum(value) as sum,
count(*) as count
FROM signoz_metrics.samples_v4
GROUP BY
env,
temporality,
metric_name,
fingerprint,
unix_milli;

-- 30m aggregation materialized view
CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_metrics.samples_v4_agg_30m_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}
TO signoz_metrics.samples_v4_agg_30m AS
SELECT
env,
temporality,
metric_name,
fingerprint,
intDiv(unix_milli, 1800000) * 1800000 as unix_milli,
anyLast(last) as last,
min(min) as min,
max(max) as max,
sum(sum) as sum,
sum(count) as count
FROM signoz_metrics.samples_v4_agg_5m
GROUP BY
env,
temporality,
metric_name,
fingerprint,
unix_milli;

-- 5m aggregation distributed table
CREATE TABLE IF NOT EXISTS signoz_metrics.distributed_samples_v4_agg_5m ON CLUSTER {{.SIGNOZ_CLUSTER}}
AS signoz_metrics.samples_v4_agg_5m
ENGINE = Distributed('{{.SIGNOZ_CLUSTER}}', 'signoz_metrics', 'samples_v4_agg_5m', cityHash64(env, temporality, metric_name, fingerprint));

-- 30m aggregation distributed table
CREATE TABLE IF NOT EXISTS signoz_metrics.distributed_samples_v4_agg_30m ON CLUSTER {{.SIGNOZ_CLUSTER}}
AS signoz_metrics.samples_v4_agg_30m
ENGINE = Distributed('{{.SIGNOZ_CLUSTER}}', 'signoz_metrics', 'samples_v4_agg_30m', cityHash64(env, temporality, metric_name, fingerprint));

-- time series v4 1 week table
CREATE TABLE IF NOT EXISTS signoz_metrics.time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}} (
env LowCardinality(String) DEFAULT 'default',
temporality LowCardinality(String) DEFAULT 'Unspecified',
metric_name LowCardinality(String),
description LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
unit LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
type LowCardinality(String) DEFAULT '' CODEC(ZSTD(1)),
is_monotonic Bool DEFAULT false CODEC(ZSTD(1)),
fingerprint UInt64 CODEC(Delta, ZSTD),
unix_milli Int64 CODEC(Delta, ZSTD),
labels String CODEC(ZSTD(5)),
INDEX idx_labels labels TYPE ngrambf_v1(4, 1024, 3, 0) GRANULARITY 1
)
ENGINE = {{.SIGNOZ_REPLICATED}}ReplacingMergeTree
PARTITION BY toDate(unix_milli / 1000)
ORDER BY (env, temporality, metric_name, fingerprint, unix_milli)
TTL toDateTime(unix_milli/1000) + INTERVAL 2592000 SECOND DELETE
SETTINGS ttl_only_drop_parts = 1;

-- time series v4 1 week materialized view
CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_metrics.time_series_v4_1week_mv ON CLUSTER {{.SIGNOZ_CLUSTER}}
TO signoz_metrics.time_series_v4_1week
AS SELECT
env,
temporality,
metric_name,
description,
unit,
type,
is_monotonic,
fingerprint,
floor(unix_milli/604800000)*604800000 AS unix_milli,
labels
FROM signoz_metrics.time_series_v4_1day;

-- time series v4 1 week distributed table
CREATE TABLE IF NOT EXISTS signoz_metrics.distributed_time_series_v4_1week ON CLUSTER {{.SIGNOZ_CLUSTER}}
AS signoz_metrics.time_series_v4_1week
ENGINE = Distributed("{{.SIGNOZ_CLUSTER}}", signoz_metrics, time_series_v4_1week, cityHash64(env, temporality, metric_name, fingerprint));

0 comments on commit 0fe6a5f

Please sign in to comment.