Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental support for delta temporality #121

Merged
merged 7 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion exporter/clickhousemetricsexporter/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/prometheus/prometheus/prompb"
)
Expand All @@ -34,7 +35,7 @@ type Storage interface {
// Read(context.Context, []Query) (*prompb.ReadResponse, error)

// Write puts data into storage.
Write(context.Context, *prompb.WriteRequest) error
Write(context.Context, *prompb.WriteRequest, map[string]pmetric.AggregationTemporality) error

// Returns the DB conn.
GetDBConn() interface{}
Expand Down
123 changes: 109 additions & 14 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries"
Expand All @@ -39,14 +41,18 @@ import (
)

const (
namespace = "promhouse"
subsystem = "clickhouse"
nameLabel = "__name__"
CLUSTER = "cluster"
DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
TIME_SERIES_TABLE = "time_series_v2"
SAMPLES_TABLE = "samples_v2"
namespace = "promhouse"
subsystem = "clickhouse"
nameLabel = "__name__"
CLUSTER = "cluster"
DISTRIBUTED_TIME_SERIES_TABLE = "distributed_time_series_v2"
DISTRIBUTED_TIME_SERIES_TABLE_V3 = "distributed_time_series_v3"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
TIME_SERIES_TABLE = "time_series_v2"
TIME_SERIES_TABLE_V3 = "time_series_v3"
SAMPLES_TABLE = "samples_v2"
temporalityLabel = "__temporality__"
envLabel = "env"
)

// clickHouse implements storage interface for the ClickHouse.
Expand Down Expand Up @@ -138,6 +144,35 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER))

// Add temporality column to time_series table
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER))

// Add set index on temporality column
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER))

// Create a new table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s (
env LowCardinality(String) DEFAULT 'default',
temporality LowCardinality(String) DEFAULT 'Unspecified',
metric_name LowCardinality(String),
fingerprint UInt64 CODEC(Delta, ZSTD),
timestamp_ms Int64 CODEC(Delta, ZSTD),
labels String CODEC(ZSTD(5))
)
ENGINE = ReplacingMergeTree
PARTITION BY toDate(timestamp_ms / 1000)
ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER))

// Create a new distributed table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3))

options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
}
Expand Down Expand Up @@ -253,14 +288,15 @@ func (ch *clickHouse) GetDBConn() interface{} {
return ch.conn
}

func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) error {
func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metricNameToTemporality map[string]pmetric.AggregationTemporality) error {
// calculate fingerprints, map them to time series
fingerprints := make([]uint64, len(data.Timeseries))
timeSeries := make(map[uint64][]*prompb.Label, len(data.Timeseries))
fingerprintToName := make(map[uint64]string)
fingerprintToName := make(map[uint64]map[string]string)

for i, ts := range data.Timeseries {
var metricName string
var env string = "default"
labelsOverridden := make(map[string]*prompb.Label)
for _, label := range ts.Labels {
labelsOverridden[label.Name] = &prompb.Label{
Expand All @@ -270,16 +306,32 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
if label.Name == nameLabel {
metricName = label.Value
}
if label.Name == semconv.AttributeDeploymentEnvironment || label.Name == sanitize(semconv.AttributeDeploymentEnvironment) {
env = label.Value
}
}
var labels []*prompb.Label
for _, l := range labelsOverridden {
labels = append(labels, l)
}
// add temporality label
if metricName != "" {
if t, ok := metricNameToTemporality[metricName]; ok {
labels = append(labels, &prompb.Label{
Name: temporalityLabel,
Value: t.String(),
})
}
}
timeseries.SortLabels(labels)
f := timeseries.Fingerprint(labels)
fingerprints[i] = f
timeSeries[f] = labels
fingerprintToName[f] = metricName
if _, ok := fingerprintToName[f]; !ok {
fingerprintToName[f] = make(map[string]string)
}
fingerprintToName[f][nameLabel] = metricName
fingerprintToName[f][env] = env
}
if len(fingerprints) != len(timeSeries) {
ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries))
Expand All @@ -304,15 +356,16 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (metric_name, temporality, timestamp_ms, fingerprint, labels) VALUES (?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
err = statement.Append(
fingerprintToName[fingerprint],
fingerprintToName[fingerprint][nameLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
timestamp,
fingerprint,
encodedLabels,
Expand All @@ -336,6 +389,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
return err
}

// Write to time_series_v3 table
err = func() error {
ctx := context.Background()
err := ch.conn.Exec(ctx, `SET allow_experimental_object_type = 1`)
if err != nil {
return err
}

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels) VALUES (?, ?, ?, ?, ?, ?)", ch.database, TIME_SERIES_TABLE_V3))
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
metricNameToTemporality[fingerprintToName[fingerprint][nameLabel]].String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
)
if err != nil {
return err
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics)),
tag.Upsert(tableKey, TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

if err != nil {
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()
Expand Down Expand Up @@ -367,7 +462,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest) erro
}

err = statement.Append(
fingerprintToName[fingerprint],
fingerprintToName[fingerprint][nameLabel],
fingerprint,
s.Timestamp,
s.Value,
Expand Down
108 changes: 56 additions & 52 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pkg/errors"
"go.opencensus.io/stats/view"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/prometheus/prometheus/prompb"

Expand All @@ -47,18 +46,20 @@ const maxBatchByteSize = 3000000

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type PrwExporter struct {
namespace string
externalLabels map[string]string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
ch base.Storage
usageCollector *usage.UsageCollector
namespace string
externalLabels map[string]string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
ch base.Storage
usageCollector *usage.UsageCollector
metricNameToTemporality map[string]pmetric.AggregationTemporality
mux *sync.Mutex
}

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
Expand Down Expand Up @@ -107,17 +108,19 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err
}

return &PrwExporter{
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
ch: ch,
usageCollector: collector,
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
ch: ch,
usageCollector: collector,
metricNameToTemporality: make(map[string]pmetric.AggregationTemporality),
mux: new(sync.Mutex),
}, nil
}

Expand Down Expand Up @@ -167,37 +170,31 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
// TODO: decide if scope information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
var temporality pmetric.AggregationTemporality

// check for valid type and temporality combination and for matching data field and type
if ok := validateMetrics(metric); !ok {
dropped++
errs = multierr.Append(errs, consumererror.NewPermanent(errors.New("invalid temporality and type combination")))
serviceName, found := resource.Attributes().Get("service.name")
if !found {
serviceName = pcommon.NewValueStr("<missing-svc>")
}
metricType := metric.Type()
var numDataPoints int
var temporality pmetric.AggregationTemporality
switch metricType {
case pmetric.MetricTypeGauge:
numDataPoints = metric.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
numDataPoints = metric.Sum().DataPoints().Len()
temporality = metric.Sum().AggregationTemporality()
case pmetric.MetricTypeHistogram:
numDataPoints = metric.Histogram().DataPoints().Len()
temporality = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
numDataPoints = metric.Summary().DataPoints().Len()
default:
}
prwe.settings.Logger.Error("unsupported metric type and temporality combination", zap.Int("numDataPoints", numDataPoints), zap.Any("metricType", metricType), zap.Any("temporality", temporality), zap.String("service name", serviceName.AsString()))
continue
metricType := metric.Type()

switch metricType {
case pmetric.MetricTypeGauge:
temporality = pmetric.AggregationTemporalityUnspecified
case pmetric.MetricTypeSum:
temporality = metric.Sum().AggregationTemporality()
case pmetric.MetricTypeHistogram:
temporality = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
temporality = pmetric.AggregationTemporalityUnspecified
default:
}
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)] = temporality

if metricType == pmetric.MetricTypeHistogram || metricType == pmetric.MetricTypeSummary {
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+bucketStr] = temporality
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+countStr] = temporality
prwe.metricNameToTemporality[getPromMetricName(metric, prwe.namespace)+sumStr] = temporality
}

// handle individual metric based on type
switch metric.Type() {
switch metricType {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if err := prwe.addNumberDataPointSlice(dataPoints, tsMap, resource, metric); err != nil {
Expand Down Expand Up @@ -280,6 +277,13 @@ func (prwe *PrwExporter) addNumberDataPointSlice(dataPoints pmetric.NumberDataPo

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
prwe.mux.Lock()
// make a copy of metricNameToTemporality
metricNameToTemporality := make(map[string]pmetric.AggregationTemporality)
for k, v := range prwe.metricNameToTemporality {
metricNameToTemporality[k] = v
}
prwe.mux.Unlock()
var errs []error
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
Expand Down Expand Up @@ -307,7 +311,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
defer wg.Done()

for request := range input {
err := prwe.ch.Write(ctx, request)
err := prwe.ch.Write(ctx, request, metricNameToTemporality)
if err != nil {
mu.Lock()
errs = append(errs, err)
Expand Down
4 changes: 2 additions & 2 deletions exporter/clickhousemetricsexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func validateMetrics(metric pmetric.Metric) bool {
case pmetric.MetricTypeGauge:
return metric.Gauge().DataPoints().Len() != 0
case pmetric.MetricTypeSum:
return metric.Sum().DataPoints().Len() != 0 && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Sum().DataPoints().Len() != 0
case pmetric.MetricTypeHistogram:
return metric.Histogram().DataPoints().Len() != 0 && metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Histogram().DataPoints().Len() != 0
case pmetric.MetricTypeSummary:
return metric.Summary().DataPoints().Len() != 0
}
Expand Down
Loading