Skip to content

Commit

Permalink
Merge branch 'main' into chore/update-build
Browse files Browse the repository at this point in the history
  • Loading branch information
makeavish authored Jul 18, 2023
2 parents 56e850f + 1b4097d commit 168f041
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 112 deletions.
3 changes: 2 additions & 1 deletion exporter/clickhousemetricsexporter/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/prometheus/prometheus/prompb"
)
Expand All @@ -34,7 +35,7 @@ type Storage interface {
// Read(context.Context, []Query) (*prompb.ReadResponse, error)

// Write puts data into storage.
Write(context.Context, *prompb.WriteRequest) error
Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error

// Returns the DB conn.
GetDBConn() interface{}
Expand Down
187 changes: 141 additions & 46 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries"
Expand All @@ -39,14 +41,18 @@ import (
)

const (
namespace = "promhouse"
subsystem = "clickhouse"
nameLabel = "__name__"
CLUSTER = "cluster"
DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
TIME_SERIES_TABLE = "time_series_v2"
SAMPLES_TABLE = "samples_v2"
namespace = "promhouse"
subsystem = "clickhouse"
nameLabel = "__name__"
CLUSTER = "cluster"
DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2"
DISTRIBUTED_TIME_SERIES_TABLE_V3 = "distributed_time_series_v3"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
TIME_SERIES_TABLE = "time_series_v2"
TIME_SERIES_TABLE_V3 = "time_series_v3"
SAMPLES_TABLE = "samples_v2"
temporalityLabel = "__temporality__"
envLabel = "env"
)

// clickHouse implements storage interface for the ClickHouse.
Expand All @@ -60,7 +66,9 @@ type clickHouse struct {
// Maintains the lookup map for fingerprints that are
// written to time series table. This map is used to eliminate the
// unnecessary writes to table for the records that already exist.
timeSeries map[uint64]struct{}
timeSeries map[uint64]struct{}
prevShardCount uint64
watcherInterval time.Duration

mWrittenTimeSeries prometheus.Counter
}
Expand All @@ -70,6 +78,7 @@ type ClickHouseParams struct {
DropDatabase bool
MaxOpenConns int
MaxTimeSeriesInQuery int
WatcherInterval time.Duration
}

func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
Expand Down Expand Up @@ -135,6 +144,35 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER))

// Add temporality column to time_series table
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER))

// Add set index on temporality column
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER))

// Create a new table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s (
env LowCardinality(String) DEFAULT 'default',
temporality LowCardinality(String) DEFAULT 'Unspecified',
metric_name LowCardinality(String),
fingerprint UInt64 CODEC(Delta, ZSTD),
timestamp_ms Int64 CODEC(Delta, ZSTD),
labels String CODEC(ZSTD(5))
)
ENGINE = ReplacingMergeTree
PARTITION BY toDate(timestamp_ms / 1000)
ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER))

// Create a new distributed table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3))

options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
}
Expand All @@ -161,6 +199,15 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
}
}

// TODO(srikanthccv): Remove this once we have a better way to handle data and last write
removeTTL := fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s REMOVE TTL;`, database, TIME_SERIES_TABLE, CLUSTER)
if err = conn.Exec(context.Background(), removeTTL); err != nil {
if !strings.Contains(err.Error(), "Table doesn't have any table TTL expression, cannot remove.") {
return nil, err
}
}

ch := &clickHouse{
conn: conn,
l: l,
Expand All @@ -175,60 +222,48 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
Name: "written_time_series",
Help: "Number of written time series.",
}),
watcherInterval: params.WatcherInterval,
}

go func() {
ctx := pprof.WithLabels(context.TODO(), pprof.Labels("component", "clickhouse_reloader"))
pprof.SetGoroutineLabels(ctx)
ch.runTimeSeriesReloader(ctx)
ch.shardCountWatcher(ctx)
}()

return ch, nil
}

// runTimeSeriesReloader periodically queries the time series table
// and updates the timeSeries lookup map with new fingerprints.
// One might wonder why is there a need to reload the data from clickhouse
// when it just suffices to keep track of the fingerprint for the incoming
// write requests. This is because there could be multiple instance of
// metric exporters and they would only contain partial info with latter
// approach.
func (ch *clickHouse) runTimeSeriesReloader(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
func (ch *clickHouse) shardCountWatcher(ctx context.Context) {
ticker := time.NewTicker(ch.watcherInterval)
defer ticker.Stop()

q := fmt.Sprintf(`SELECT DISTINCT fingerprint FROM %s.%s`, ch.database, DISTRIBUTED_TIME_SERIES_TABLE)
q := `SELECT count() FROM system.clusters WHERE cluster='cluster'`
for {
ch.timeSeriesRW.RLock()
timeSeries := make(map[uint64]struct{}, len(ch.timeSeries))
ch.timeSeriesRW.RUnlock()

err := func() error {
ch.l.Debug(q)
rows, err := ch.conn.Query(ctx, q)
row := ch.conn.QueryRow(ctx, q)
if row.Err() != nil {
return row.Err()
}

var shardCount uint64
err := row.Scan(&shardCount)
if err != nil {
return err
}
defer rows.Close()

var f uint64
for rows.Next() {
if err = rows.Scan(&f); err != nil {
return err
}
timeSeries[f] = struct{}{}
}
return rows.Err()
}()
if err == nil {
ch.timeSeriesRW.Lock()
n := len(timeSeries) - len(ch.timeSeries)
for f, m := range timeSeries {
ch.timeSeries[f] = m
if ch.prevShardCount != shardCount {
ch.l.Infof("Shard count changed from %d to %d. Resetting time series map.", ch.prevShardCount, shardCount)
ch.timeSeries = make(map[uint64]struct{})
}
ch.prevShardCount = shardCount
ch.timeSeriesRW.Unlock()
ch.l.Debugf("Loaded %d existing time series, %d were unknown to this instance.", len(timeSeries), n)
} else {
return nil
}()
if err != nil {
ch.l.Error(err)
}

Expand All @@ -253,14 +288,15 @@ func (ch *clickHouse) GetDBConn() interface{} {
return ch.conn
}

func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) error {
func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error {
// calculate fingerprints, map them to time series
fingerprints := make([]uint64, len(data.Timeseries))
timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries))
fingerprintToName := make(map[uint64]string)
fingerprintToName := make(map[uint64]map[string]string)

for i, ts := range data.Timeseries {
var metricName string
var env string = "default"
labelsOverridden := make(map[string]*prompb.Label)
for _, label := range ts.Labels {
labelsOverridden[label.Name] = &prompb.Label{
Expand All @@ -270,16 +306,32 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
if label.Name == nameLabel {
metricName = label.Value
}
if label.Name == semconv.AttributeDeploymentEnvironment || label.Name == sanitize(semconv.AttributeDeploymentEnvironment) {
env = label.Value
}
}
var labels []*prompb.Label
for _, l := range labelsOverridden {
labels = append(labels, l)
}
// add temporality label
if metricName != "" {
if t, ok := metricNameToTemporality[metricName]; ok {
labels = append(labels, &prompb.Label{
Name: temporalityLabel,
Value: t.String(),
})
}
}
timeseries.SortLabels(labels)
f := timeseries.Fingerprint(labels)
fingerprints[i] = f
timeSeries[f] = labels
fingerprintToName[f] = metricName
if _, ok := fingerprintToName[f]; !ok {
fingerprintToName[f] = make(map[string]string)
}
fingerprintToName[f][nameLabel] = metricName
fingerprintToName[f][env] = env
}
if len(fingerprints) != len(timeSeries) {
ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries))
Expand All @@ -304,15 +356,16 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
err = statement.Append(
fingerprintToName[fingerprint],
fingerprintToName[fingerprint][nameLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
timestamp,
fingerprint,
encodedLabels,
Expand All @@ -336,6 +389,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
return err
}

// Write to time_series_v3 table
err = func() error {
ctx := context.Background()
err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`)
if err != nil {
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
)
if err != nil {
return err
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics)),
tag.Upsert(tableKey, TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

if err != nil {
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()
Expand Down Expand Up @@ -367,7 +462,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
}

err = statement.Append(
fingerprintToName[fingerprint],
fingerprintToName[fingerprint][nameLabel],
fingerprint,
s.Timestamp,
s.Value,
Expand Down
3 changes: 3 additions & 0 deletions exporter/clickhousemetricsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clickhousemetricsexporter

import (
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -43,6 +44,8 @@ type Config struct {
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`

WatcherInterval time.Duration `mapstructure:"watcher_interval"`
}

// RemoteWriteQueue allows to configure the remote write queue.
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func Test_loadConfig(t *testing.T) {
},
},
ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true},
WatcherInterval: 30 * time.Second,
})
}

Expand Down
Loading

0 comments on commit 168f041

Please sign in to comment.