Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Delta temporality #2505

Merged
merged 16 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type APIHandlerOptions struct {
DataConnector interfaces.DataConnector
SkipConfig *basemodel.SkipConfig
PreferDelta bool
AppDao dao.ModelDao
RulesManager *rules.Manager
FeatureFlags baseint.FeatureLookup
Expand All @@ -34,6 +35,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector,
SkipConfig: opts.SkipConfig,
PerferDelta: opts.PreferDelta,
AppDao: opts.AppDao,
RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags})
Expand Down
2 changes: 2 additions & 0 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ServerOptions struct {
// alert specific params
DisableRules bool
RuleRepoURL string
PreferDelta bool
}

// Server runs HTTP api service
Expand Down Expand Up @@ -170,6 +171,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
apiOpts := api.APIHandlerOptions{
DataConnector: reader,
SkipConfig: skipConfig,
PreferDelta: serverOptions.PreferDelta,
AppDao: modelDao,
RulesManager: rm,
FeatureFlags: lm,
Expand Down
3 changes: 3 additions & 0 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ func main() {
var ruleRepoURL string

var enableQueryServiceLogOTLPExport bool
var preferDelta bool

flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over raw metrics)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
flag.Parse()
Expand All @@ -102,6 +104,7 @@ func main() {
HTTPHostPort: baseconst.HTTPHostPort,
PromConfigPath: promConfigPath,
SkipTopLvlOpsPath: skipTopLvlOpsPath,
PreferDelta: preferDelta,
PrivateHostPort: baseconst.PrivateHostPort,
DisableRules: disableRules,
RuleRepoURL: ruleRepoURL,
Expand Down
26 changes: 26 additions & 0 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3249,6 +3249,32 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context)
return spansInLastHeartBeatInterval, nil
}

func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) {

metricNameToTemporality := make(map[string]map[v3.Temporality]bool)

query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN [$1]`, signozMetricDBName, signozTSTableName)

rows, err := r.db.Query(ctx, query, metricNames)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var metricName, temporality string
err := rows.Scan(&metricName, &temporality)
if err != nil {
return nil, err
}
if _, ok := metricNameToTemporality[metricName]; !ok {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
}
return metricNameToTemporality, nil
}

// func sum(array []tsByMetricName) uint64 {
// var result uint64
// result = 0
Expand Down
55 changes: 55 additions & 0 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type APIHandler struct {
featureFlags interfaces.FeatureLookup
ready func(http.HandlerFunc) http.HandlerFunc
queryBuilder *queryBuilder.QueryBuilder
preferDelta bool

// SetupCompleted indicates if SigNoz is ready for general use.
// at the moment, we mark the app ready when the first user
Expand All @@ -84,6 +85,8 @@ type APIHandlerOpts struct {
Reader interfaces.Reader

SkipConfig *model.SkipConfig

PerferDelta bool
// dao layer to perform crud on app objects like dashboard, alerts etc
AppDao dao.ModelDao

Expand All @@ -106,6 +109,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
reader: opts.Reader,
appDao: opts.AppDao,
skipConfig: opts.SkipConfig,
preferDelta: opts.PerferDelta,
alertManager: alertManager,
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,
Expand Down Expand Up @@ -452,6 +456,48 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
aH.Respond(w, tagValueList)
}

func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {

metricNames := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceMetrics {
metricNames = append(metricNames, query.AggregateAttribute.Key)
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
}
}
}
}

var err error

if aH.preferDelta {
zap.S().Debug("fetching metric temporality")
metricNameToTemporality, err = aH.reader.FetchTemporality(ctx, metricNames)
if err != nil {
return err
}
}

if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics {
if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
}
}
}
return nil
}

func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) {
metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r)

Expand Down Expand Up @@ -2776,6 +2822,15 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
return
}

// add temporality for each metric

temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}

aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
}

Expand Down
174 changes: 174 additions & 0 deletions pkg/query-service/app/metrics/v3/delta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package v3

import (
"fmt"

"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)

func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {

metricQueryGroupBy := mq.GroupBy

// if the aggregate operator is a histogram quantile, and user has not forgotten
// the le tag in the group by then add the le tag to the group by
if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant99 {
found := false
for _, tag := range mq.GroupBy {
if tag.Key == "le" {
found = true
break
}
}
if !found {
metricQueryGroupBy = append(
metricQueryGroupBy,
v3.AttributeKey{
Key: "le",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
)
}
}

if mq.Filters != nil {
mq.Filters.Items = append(mq.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: "__temporality__"},
Operator: v3.FilterOperatorEqual,
Value: "Delta",
})
}

filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
if err != nil {
return "", err
}

samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)

// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" GLOBAL INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
" GROUP BY %s" +
" ORDER BY %s ts"

// tagsWithoutLe is used to group by all tags except le
// This is done because we want to group by le only when we are calculating quantile
// Otherwise, we want to group by all tags except le
tagsWithoutLe := []string{}
for _, tag := range mq.GroupBy {
if tag.Key != "le" {
tagsWithoutLe = append(tagsWithoutLe, tag.Key)
}
}

groupByWithoutLe := groupBy(tagsWithoutLe...)
groupTagsWithoutLe := groupSelect(tagsWithoutLe...)
orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe)

groupBy := groupByAttributeKeyTags(metricQueryGroupBy...)
groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy)

if len(orderBy) != 0 {
orderBy += ","
}
if len(orderWithoutLe) != 0 {
orderWithoutLe += ","
}

switch mq.AggregateOperator {
case v3.AggregateOperatorRate:
// Calculate rate of change of metric for each unique time series
groupBy = "fingerprint, ts"
orderBy = "fingerprint, "
groupTags = "fingerprint,"
op := fmt.Sprintf("sum(value)/%d", step)
query := fmt.Sprintf(
queryTmpl, "any(labels) as fullLabels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy,
) // labels will be same so any should be fine

return query, nil
case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate:
op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step)
query := fmt.Sprintf(
queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy,
)
return query, nil
case
v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRateMin:
op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step)
query := fmt.Sprintf(
queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy,
)
return query, nil
case
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99:
op := fmt.Sprintf("sum(value)/%d", step)
query := fmt.Sprintf(
queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy,
) // labels will be same so any should be fine
value := aggregateOperatorToPercentile[mq.AggregateOperator]

query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorCount:
op := "toFloat64(count(*))"
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorCountDistinct:
op := "toFloat64(count(distinct(value)))"
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorNoOp:
queryTmpl :=
"SELECT fingerprint, labels as fullLabels," +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" any(value) as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" GLOBAL INNER JOIN" +
ankitnayan marked this conversation as resolved.
Show resolved Hide resolved
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
" GROUP BY fingerprint, labels, ts" +
" ORDER BY fingerprint, labels, ts"
query := fmt.Sprintf(queryTmpl, step, filterSubQuery)
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator")
}
}
22 changes: 17 additions & 5 deletions pkg/query-service/app/metrics/v3/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
}

// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))`
var rateWithoutNegative = `if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) `

// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
// timeseries based on search criteria
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, metricName string, aggregateOperator v3.AggregateOperator) (string, error) {
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) {
metricName := mq.AggregateAttribute.Key
aggregateOperator := mq.AggregateOperator
var conditions []string
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName)))
if mq.Temporality == v3.Delta {
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta))
} else {
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality IN ['%s', '%s']", utils.ClickHouseFormattedValue(metricName), v3.Cumulative, v3.Unspecified))
}

if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
Expand Down Expand Up @@ -157,7 +163,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
}
}

filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq.AggregateAttribute.Key, mq.AggregateOperator)
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -413,7 +419,13 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v
}

func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) {
query, err := buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
var query string
var err error
if mq.Temporality == v3.Delta {
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
} else {
query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
}
if err != nil {
return "", err
}
Expand Down
Loading