Skip to content

Commit

Permalink
Add experimental support for delta temporality (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Jul 13, 2023
1 parent 7969aef commit 8d74730
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 74 deletions.
3 changes: 2 additions & 1 deletion exporter/clickhousemetricsexporter/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/prometheus/prometheus/prompb"
)
Expand All @@ -34,7 +35,7 @@ type Storage interface {
// Read(context.Context, []Query) (*prompb.ReadResponse, error)

// Write puts data into storage.
Write(context.Context, *prompb.WriteRequest) error
Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error

// Returns the DB conn.
GetDBConn() interface{}
Expand Down
123 changes: 109 additions & 14 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries"
Expand All @@ -39,14 +41,18 @@ import (
)

const (
namespace = "promhouse"
subsystem = "clickhouse"
nameLabel = "__name__"
CLUSTER = "cluster"
DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
TIME_SERIES_TABLE = "time_series_v2"
SAMPLES_TABLE = "samples_v2"
namespace = "promhouse"
subsystem = "clickhouse"
nameLabel = "__name__"
CLUSTER = "cluster"
DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2"
DISTRIBUTED_TIME_SERIES_TABLE_V3 = "distributed_time_series_v3"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
TIME_SERIES_TABLE = "time_series_v2"
TIME_SERIES_TABLE_V3 = "time_series_v3"
SAMPLES_TABLE = "samples_v2"
temporalityLabel = "__temporality__"
envLabel = "env"
)

// clickHouse implements storage interface for the ClickHouse.
Expand Down Expand Up @@ -138,6 +144,35 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER))

// Add temporality column to time_series table
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER))

// Add set index on temporality column
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER))

// Create a new table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s (
env LowCardinality(String) DEFAULT 'default',
temporality LowCardinality(String) DEFAULT 'Unspecified',
metric_name LowCardinality(String),
fingerprint UInt64 CODEC(Delta, ZSTD),
timestamp_ms Int64 CODEC(Delta, ZSTD),
labels String CODEC(ZSTD(5))
)
ENGINE = ReplacingMergeTree
PARTITION BY toDate(timestamp_ms / 1000)
ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER))

// Create a new distributed table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3))

options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
}
Expand Down Expand Up @@ -253,14 +288,15 @@ func (ch *clickHouse) GetDBConn() interface{} {
return ch.conn
}

func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) error {
func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error {
// calculate fingerprints, map them to time series
fingerprints := make([]uint64, len(data.Timeseries))
timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries))
fingerprintToName := make(map[uint64]string)
fingerprintToName := make(map[uint64]map[string]string)

for i, ts := range data.Timeseries {
var metricName string
var env string = "default"
labelsOverridden := make(map[string]*prompb.Label)
for _, label := range ts.Labels {
labelsOverridden[label.Name] = &prompb.Label{
Expand All @@ -270,16 +306,32 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
if label.Name == nameLabel {
metricName = label.Value
}
if label.Name == semconv.AttributeDeploymentEnvironment || label.Name == sanitize(semconv.AttributeDeploymentEnvironment) {
env = label.Value
}
}
var labels []*prompb.Label
for _, l := range labelsOverridden {
labels = append(labels, l)
}
// add temporality label
if metricName != "" {
if t, ok := metricNameToTemporality[metricName]; ok {
labels = append(labels, &prompb.Label{
Name: temporalityLabel,
Value: t.String(),
})
}
}
timeseries.SortLabels(labels)
f := timeseries.Fingerprint(labels)
fingerprints[i] = f
timeSeries[f] = labels
fingerprintToName[f] = metricName
if _, ok := fingerprintToName[f]; !ok {
fingerprintToName[f] = make(map[string]string)
}
fingerprintToName[f][nameLabel] = metricName
fingerprintToName[f][env] = env
}
if len(fingerprints) != len(timeSeries) {
ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries))
Expand All @@ -304,15 +356,16 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
err = statement.Append(
fingerprintToName[fingerprint],
fingerprintToName[fingerprint][nameLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
timestamp,
fingerprint,
encodedLabels,
Expand All @@ -336,6 +389,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
return err
}

// Write to time_series_v3 table
err = func() error {
ctx := context.Background()
err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`)
if err != nil {
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
)
if err != nil {
return err
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics)),
tag.Upsert(tableKey, TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

if err != nil {
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()
Expand Down Expand Up @@ -367,7 +462,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
}

err = statement.Append(
fingerprintToName[fingerprint],
fingerprintToName[fingerprint][nameLabel],
fingerprint,
s.Timestamp,
s.Value,
Expand Down
108 changes: 56 additions & 52 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pkg/errors"
"go.opencensus.io/stats/view"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/prometheus/prometheus/prompb"

Expand All @@ -47,18 +46,20 @@ const maxBatchByteSize = 3000000

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type PrwExporter struct {
namespace string
externalLabels map[string]string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
ch base.Storage
usageCollector *usage.UsageCollector
namespace string
externalLabels map[string]string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
ch base.Storage
usageCollector *usage.UsageCollector
metricNameToTemporality map[string]pmetric.AggregationTemporality
mux *sync.Mutex
}

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
Expand Down Expand Up @@ -107,17 +108,19 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err
}

return &PrwExporter{
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
ch: ch,
usageCollector: collector,
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
ch: ch,
usageCollector: collector,
metricNameToTemporality: make(map[string]pmetric.AggregationTemporality),
mux: new(sync.Mutex),
}, nil
}

Expand Down Expand Up @@ -167,37 +170,31 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
// TODO: decide if scope information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
var temporality pmetric.AggregationTemporality

// check for valid type and temporality combination and for matching data field and type
if ok := validateMetrics(metric); !ok {
dropped++
errs = multierr.Append(errs, consumererror.NewPermanent(errors.New("invalid temporality and type combination")))
serviceName, found := resource.Attributes().Get("service.name")
if !found {
serviceName = pcommon.NewValueStr("<missing-svc>")
}
metricType := metric.Type()
var numDataPoints int
var temporality pmetric.AggregationTemporality
switch metricType {
case pmetric.MetricTypeGauge:
numDataPoints = metric.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
numDataPoints = metric.Sum().DataPoints().Len()
temporality = metric.Sum().AggregationTemporality()
case pmetric.MetricTypeHistogram:
numDataPoints = metric.Histogram().DataPoints().Len()
temporality = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
numDataPoints = metric.Summary().DataPoints().Len()
default:
}
prwe.settings.Logger.Error("unsupported metric type and temporality combination", zap.Int("numDataPoints", numDataPoints), zap.Any("metricType", metricType), zap.Any("temporality", temporality), zap.String("service name", serviceName.AsString()))
continue
metricType := metric.Type()

switch metricType {
case pmetric.MetricTypeGauge:
temporality = pmetric.AggregationTemporalityUnspecified
case pmetric.MetricTypeSum:
temporality = metric.Sum().AggregationTemporality()
case pmetric.MetricTypeHistogram:
temporality = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
temporality = pmetric.AggregationTemporalityUnspecified
default:
}
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality

if metricType == pmetric.MetricTypeHistogram || metricType == pmetric.MetricTypeSummary {
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+bucketStr] = temporality
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+countStr] = temporality
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+sumStr] = temporality
}

// handle individual metric based on type
switch metric.Type() {
switch metricType {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if err := prwe.addNumberDataPointSlice(dataPoints, tsMap, resource, metric); err != nil {
Expand Down Expand Up @@ -280,6 +277,13 @@ func (prwe *PrwExporter) addNumberDataPointSlice(dataPoints pmetric.NumberDataPo

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
prwe.mux.Lock()
// make a copy of metricNameToTemporality
metricNameToTemporality := make(map[string]pmetric.AggregationTemporality)
for k, v := range prwe.metricNameToTemporality {
metricNameToTemporality[k] = v
}
prwe.mux.Unlock()
var errs []error
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
Expand Down Expand Up @@ -307,7 +311,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
defer wg.Done()

for request := range input {
err := prwe.ch.Write(ctx, request)
err := prwe.ch.Write(ctx, request, metricNameToTemporality)
if err != nil {
mu.Lock()
errs = append(errs, err)
Expand Down
4 changes: 2 additions & 2 deletions exporter/clickhousemetricsexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func validateMetrics(metric pmetric.Metric) bool {
case pmetric.MetricTypeGauge:
return metric.Gauge().DataPoints().Len() != 0
case pmetric.MetricTypeSum:
return metric.Sum().DataPoints().Len() != 0 && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Sum().DataPoints().Len() != 0
case pmetric.MetricTypeHistogram:
return metric.Histogram().DataPoints().Len() != 0 && metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Histogram().DataPoints().Len() != 0
case pmetric.MetricTypeSummary:
return metric.Summary().DataPoints().Len() != 0
}
Expand Down
Loading

0 comments on commit 8d74730

Please sign in to comment.