From d88f3594a62582a16fa04528a56ff075f628bf4e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 18 Mar 2023 04:18:26 +0530 Subject: [PATCH 01/10] Revert "upgraded some deprecated packages (#2423)" This reverts commit c4b052c51ec3a22e25923aa92fb66665e6ca589b. --- .../TimePreferenceDropDown/index.tsx | 19 ++--- .../container/ConfigDropdown/Config/styles.ts | 17 ++++ .../src/container/ConfigDropdown/index.tsx | 23 +++-- .../Header/{SignedIn => SignedInAs}/index.tsx | 21 ++--- frontend/src/container/Header/index.tsx | 84 +++++++------------ .../src/container/ListOfDashboard/index.tsx | 23 +---- .../src/container/LogsSearchFilter/index.tsx | 13 +-- 7 files changed, 83 insertions(+), 117 deletions(-) create mode 100644 frontend/src/container/ConfigDropdown/Config/styles.ts rename frontend/src/container/Header/{SignedIn => SignedInAs}/index.tsx (69%) diff --git a/frontend/src/components/TimePreferenceDropDown/index.tsx b/frontend/src/components/TimePreferenceDropDown/index.tsx index ff6d31bcc1..8774edbdc9 100644 --- a/frontend/src/components/TimePreferenceDropDown/index.tsx +++ b/frontend/src/components/TimePreferenceDropDown/index.tsx @@ -1,9 +1,9 @@ -import { Button, Dropdown } from 'antd'; +import { Button, Dropdown, Menu } from 'antd'; import TimeItems, { timePreferance, timePreferenceType, } from 'container/NewWidget/RightContainer/timeItems'; -import React, { useCallback, useMemo } from 'react'; +import React, { useCallback } from 'react'; import { menuItems } from './config'; import { TextContainer } from './styles'; @@ -22,18 +22,13 @@ function TimePreference({ [setSelectedTime], ); - const menu = useMemo( - () => ({ - items: menuItems, - onClick: timeMenuItemOnChangeHandler, - }), - [timeMenuItemOnChangeHandler], - ); - return ( - - + } + > + + ); } diff --git a/frontend/src/container/ConfigDropdown/Config/styles.ts b/frontend/src/container/ConfigDropdown/Config/styles.ts new file mode 100644 index 0000000000..4807ea77c2 --- /dev/null +++ b/frontend/src/container/ConfigDropdown/Config/styles.ts @@ -0,0 +1,17 @@ +import { Menu } from 'antd'; +import styled from 'styled-components'; + +export const MenuDropdown = styled(Menu)` + &&& { + .ant-dropdown, + .ant-dropdown-menu, + .ant-dropdown-menu-item { + padding: 0px; + } + .ant-menu-item { + height: 1.75rem; + display: flex; + align-items: center; + } + } +`; diff --git a/frontend/src/container/ConfigDropdown/index.tsx b/frontend/src/container/ConfigDropdown/index.tsx index 1ddd676948..8390e09167 100644 --- a/frontend/src/container/ConfigDropdown/index.tsx +++ b/frontend/src/container/ConfigDropdown/index.tsx @@ -13,6 +13,7 @@ import { ConfigProps } from 'types/api/dynamicConfigs/getDynamicConfigs'; import AppReducer from 'types/reducer/app'; import HelpToolTip from './Config'; +import { MenuDropdown } from './Config/styles'; function DynamicConfigDropdown({ frontendId, @@ -33,15 +34,13 @@ function DynamicConfigDropdown({ setIsHelpDropDownOpen(!isHelpDropDownOpen); }; - const menu = useMemo( - () => ({ - items: [ - { - key: '1', - label: , - }, - ], - }), + const menuItems = useMemo( + () => [ + { + key: '1', + label: , + }, + ], [config], ); @@ -54,10 +53,10 @@ function DynamicConfigDropdown({ return ( } + visible={isHelpDropDownOpen} > ((state) => state.app); - const onManageAccountClick = useCallback(() => { - onToggle(); - history.push(ROUTES.MY_SETTINGS); - }, [onToggle]); - if (!user) { return
; } @@ -35,7 +30,11 @@ function SignedIn({ onToggle }: SignedInProps): JSX.Element { {email}
- + { + history.push(ROUTES.MY_SETTINGS); + }} + > Manage Account @@ -43,8 +42,4 @@ function SignedIn({ onToggle }: SignedInProps): JSX.Element { ); } -interface SignedInProps { - onToggle: VoidFunction; -} - -export default SignedIn; +export default SignedInAS; diff --git a/frontend/src/container/Header/index.tsx b/frontend/src/container/Header/index.tsx index a34287e665..9f04454d33 100644 --- a/frontend/src/container/Header/index.tsx +++ b/frontend/src/container/Header/index.tsx @@ -3,19 +3,12 @@ import { CaretUpFilled, LogoutOutlined, } from '@ant-design/icons'; -import type { MenuProps } from 'antd'; -import { Divider, Dropdown, Space, Typography } from 'antd'; +import { Divider, Dropdown, Menu, Space, Typography } from 'antd'; import { Logout } from 'api/utils'; import ROUTES from 'constants/routes'; import Config from 'container/ConfigDropdown'; import { useIsDarkMode, useThemeMode } from 'hooks/useDarkMode'; -import React, { - Dispatch, - SetStateAction, - useCallback, - useMemo, - useState, -} from 'react'; +import React, { Dispatch, SetStateAction, useCallback, useState } from 'react'; import { useSelector } from 'react-redux'; import { NavLink } from 'react-router-dom'; import { AppState } from 'store/reducers'; @@ -23,7 +16,7 @@ import AppReducer from 'types/reducer/app'; import CurrentOrganization from './CurrentOrganization'; import ManageLicense from './ManageLicense'; -import SignedIn from './SignedIn'; +import SignedInAS from './SignedInAs'; import { AvatarWrapper, Container, @@ -50,45 +43,32 @@ function HeaderContainer(): JSX.Element { [], ); - const onLogoutKeyDown = useCallback( - (e: React.KeyboardEvent) => { - if (e.key === 'Enter' || e.key === 'Space') { - Logout(); - } - }, - [], - ); - - const menu: MenuProps = useMemo( - () => ({ - items: [ - { - key: 'main-menu', - label: ( -
- - - - - - - - -
- Logout -
-
-
- ), - }, - ], - }), - [onToggleHandler, onLogoutKeyDown], + const menu = ( + + + + + + + + + + +
{ + if (e.key === 'Enter' || e.key === 'Space') { + Logout(); + } + }} + role="button" + onClick={Logout} + > + Logout +
+
+
+
); return ( @@ -118,10 +98,10 @@ function HeaderContainer(): JSX.Element { /> {user?.name[0]} diff --git a/frontend/src/container/ListOfDashboard/index.tsx b/frontend/src/container/ListOfDashboard/index.tsx index ca5d68d9bb..b513df0c6a 100644 --- a/frontend/src/container/ListOfDashboard/index.tsx +++ b/frontend/src/container/ListOfDashboard/index.tsx @@ -1,12 +1,5 @@ import { PlusOutlined } from '@ant-design/icons'; -import { - Card, - Dropdown, - MenuProps, - Row, - TableColumnProps, - Typography, -} from 'antd'; +import { Card, Dropdown, Menu, Row, TableColumnProps, Typography } from 'antd'; import { ItemType } from 'antd/es/menu/hooks/useItems'; import createDashboard from 'api/dashboard/create'; import { AxiosError } from 'axios'; @@ -54,12 +47,10 @@ function ListOfAllDashboard(): JSX.Element { ); const { t } = useTranslation('dashboard'); - const [ isImportJSONModalVisible, setIsImportJSONModalVisible, ] = useState(false); - const [uploadedGrafana, setUploadedGrafana] = useState(false); const [filteredDashboards, setFilteredDashboards] = useState(); @@ -67,7 +58,6 @@ function ListOfAllDashboard(): JSX.Element { useEffect(() => { setFilteredDashboards(dashboards); }, [dashboards]); - const [newDashboardState, setNewDashboardState] = useState({ loading: false, error: false, @@ -225,12 +215,7 @@ function ListOfAllDashboard(): JSX.Element { return menuItems; }, [createNewDashboard, loading, onNewDashboardHandler, t]); - const menu: MenuProps = useMemo( - () => ({ - items: getMenuItems(), - }), - [getMenuItems], - ); + const menuItems = getMenuItems(); const GetHeader = useMemo( () => ( @@ -245,7 +230,7 @@ function ListOfAllDashboard(): JSX.Element { }} /> {newDashboard && ( - + }> } type="primary" @@ -264,7 +249,7 @@ function ListOfAllDashboard(): JSX.Element { newDashboard, newDashboardState.error, newDashboardState.loading, - menu, + menuItems, ], ); diff --git a/frontend/src/container/LogsSearchFilter/index.tsx b/frontend/src/container/LogsSearchFilter/index.tsx index 75f95769f1..670c58cf70 100644 --- a/frontend/src/container/LogsSearchFilter/index.tsx +++ b/frontend/src/container/LogsSearchFilter/index.tsx @@ -173,13 +173,6 @@ function SearchFilter({ globalTime.minTime, ]); - const onPopOverChange = useCallback( - (isVisible: boolean) => { - onDropDownToggleHandler(isVisible)(); - }, - [onDropDownToggleHandler], - ); - return ( { + onDropDownToggleHandler(value)(); + }} > Date: Sat, 18 Mar 2023 19:34:12 +0530 Subject: [PATCH 02/10] chore: delta working with QB --- .../app/clickhouseReader/reader.go | 20 +++++++ pkg/query-service/app/http_handler.go | 27 +++++++++ .../app/metrics/query_builder.go | 60 ++++++++++++++++--- pkg/query-service/interfaces/interface.go | 1 + pkg/query-service/model/queryParams.go | 9 +++ 5 files changed, 110 insertions(+), 7 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 23e9f6a9fa..8deffe323c 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2166,6 +2166,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query } err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...) + fmt.Println(args...) zap.S().Info(query) @@ -3219,6 +3220,25 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) return spansInLastHeartBeatInterval, nil } +func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNameToTemporality map[string]model.Temporality) (map[string]model.Temporality, error) { + + query := fmt.Sprintf(`SELECT temporality FROM %s.%s WHERE metric_name = $1 LIMIT 1`, signozMetricDBName, signozTSTableName) + + for name := range metricNameToTemporality { + var temporality string + err := r.db.QueryRow(ctx, query, name).Scan(&temporality) + if err != nil { + zap.S().Error("unexpected error", zap.Error(err)) + } + if temporality == "Cumulative" { + metricNameToTemporality[name] = model.CUMULATIVE + } else if temporality == "Delta" { + metricNameToTemporality[name] = model.DELTA + } + } + return metricNameToTemporality, nil +} + // func sum(array []tsByMetricName) uint64 { // var result uint64 // result = 0 diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 62ebeeeedf..1a5153571f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -431,6 +431,29 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http. aH.Respond(w, tagValueList) } +func (aH *APIHandler) addTemporality(ctx context.Context, qp *model.QueryRangeParamsV2) { + + metricNameToTemporality := make(map[string]model.Temporality) + + if qp.CompositeMetricQuery != nil && len(qp.CompositeMetricQuery.BuilderQueries) > 0 { + for name := range qp.CompositeMetricQuery.BuilderQueries { + mq := qp.CompositeMetricQuery.BuilderQueries[name] + metricNameToTemporality[mq.MetricName] = model.CUMULATIVE + } + } + + metricNameToTemporality, _ = aH.reader.FetchTemporality(ctx, metricNameToTemporality) + fmt.Println(metricNameToTemporality) + + if qp.CompositeMetricQuery != nil && len(qp.CompositeMetricQuery.BuilderQueries) > 0 { + for name := range qp.CompositeMetricQuery.BuilderQueries { + mq := qp.CompositeMetricQuery.BuilderQueries[name] + mq.Temporaltiy = metricNameToTemporality[mq.MetricName] + } + } + +} + func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r) @@ -453,6 +476,10 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request metricsQueryRangeParams.End = (end / step * step) * 1000 } + // add temporality for each metric + + aH.addTemporality(r.Context(), metricsQueryRangeParams) + type channelResult struct { Series []*model.Series Err error diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go index 784b727514..4566c3ddc7 100644 --- a/pkg/query-service/app/metrics/query_builder.go +++ b/pkg/query-service/app/metrics/query_builder.go @@ -44,6 +44,13 @@ var AggregateOperatorToSQLFunc = map[model.AggregateOperator]string{ model.RATE_MIN: "min", } +var ( + queryCumulative = `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s)` + queryDelta = `SELECT %s ts, value/runningDifference(ts) as value FROM(%s)` + opCumulative = `max(value)` + opDelta = `sum(value)` +) + var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians"} func GoValuateFuncs() map[string]govaluate.ExpressionFunction { @@ -196,29 +203,57 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table // Calculate rate of change of metric for each unique time series groupBy = "fingerprint, ts" groupTags = "fingerprint," - op := "max(value)" // max value should be the closest value for point in time + var op string + if mq.Temporaltiy == model.CUMULATIVE { + op = opCumulative + } else { + op = opDelta + } subQuery := fmt.Sprintf( queryTmpl, "any(labels) as labels, "+groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags, ) // labels will be same so any should be fine - query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s)` + var query string + if mq.Temporaltiy == model.CUMULATIVE { + query = queryCumulative + } else { + query = queryDelta + } query = fmt.Sprintf(query, "labels as fullLabels,", subQuery) return query, nil case model.SUM_RATE: rateGroupBy := "fingerprint, " + groupBy rateGroupTags := "fingerprint, " + groupTags - op := "max(value)" + var op string + if mq.Temporaltiy == model.CUMULATIVE { + op = opCumulative + } else { + op = opDelta + } subQuery := fmt.Sprintf( queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags, ) // labels will be same so any should be fine - query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1` + var query string + if mq.Temporaltiy == model.CUMULATIVE { + query = queryCumulative + } else { + query = queryDelta + } + query = fmt.Sprintf(query, groupTags, subQuery) query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, groupTags) return query, nil case model.RATE_SUM, model.RATE_MAX, model.RATE_AVG, model.RATE_MIN: op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator]) subQuery := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags) - query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1` + + var query string + if mq.Temporaltiy == model.CUMULATIVE { + query = queryCumulative + } else { + query = queryDelta + } + query = fmt.Sprintf(query, groupTags, subQuery) return query, nil case model.P05, model.P10, model.P20, model.P25, model.P50, model.P75, model.P90, model.P95, model.P99: @@ -228,11 +263,22 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table case model.HIST_QUANTILE_50, model.HIST_QUANTILE_75, model.HIST_QUANTILE_90, model.HIST_QUANTILE_95, model.HIST_QUANTILE_99: rateGroupBy := "fingerprint, " + groupBy rateGroupTags := "fingerprint, " + groupTags - op := "max(value)" + var op string + if mq.Temporaltiy == model.CUMULATIVE { + op = opCumulative + } else { + op = opDelta + } subQuery := fmt.Sprintf( queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags, ) // labels will be same so any should be fine - query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1` + var query string + if mq.Temporaltiy == model.CUMULATIVE { + query = queryCumulative + } else { + query = queryDelta + } + query = fmt.Sprintf(query, groupTags, subQuery) query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, groupTags) value := AggregateOperatorToPercentile[mq.AggregateOperator] diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index c8c66669d0..36b84534d7 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -52,6 +52,7 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) + FetchTemporality(ctx context.Context, metricNameToTemporality map[string]model.Temporality) (map[string]model.Temporality, error) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 4730b1fabf..4dd517c71b 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -18,9 +18,18 @@ type QueryRangeParams struct { Stats string } +type Temporality int + +const ( + _ Temporality = iota + DELTA + CUMULATIVE +) + type MetricQuery struct { QueryName string `json:"queryName"` MetricName string `json:"metricName"` + Temporaltiy Temporality `json:"temporality"` TagFilters *FilterSet `json:"tagFilters,omitempty"` GroupingTags []string `json:"groupBy,omitempty"` AggregateOperator AggregateOperator `json:"aggregateOperator"` From fca64a171bba6d4adb193aca88f091fd9c9cf857 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 26 Mar 2023 00:09:10 +0530 Subject: [PATCH 03/10] chore: use enum --- .../app/clickhouseReader/reader.go | 16 ++++++------- pkg/query-service/app/http_handler.go | 24 +++++++++++-------- .../app/metrics/query_builder.go | 10 ++++---- .../app/metrics/query_builder_test.go | 21 ++++++++++++---- pkg/query-service/interfaces/interface.go | 2 +- pkg/query-service/model/queryParams.go | 8 +++---- 6 files changed, 47 insertions(+), 34 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 3ced062f14..806ba52de0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3220,21 +3220,19 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) return spansInLastHeartBeatInterval, nil } -func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNameToTemporality map[string]model.Temporality) (map[string]model.Temporality, error) { +func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]model.Temporality, error) { - query := fmt.Sprintf(`SELECT temporality FROM %s.%s WHERE metric_name = $1 LIMIT 1`, signozMetricDBName, signozTSTableName) + metricNameToTemporality := make(map[string]model.Temporality) - for name := range metricNameToTemporality { - var temporality string + query := fmt.Sprintf(`SELECT CAST(temporality, 'Int8') FROM %s.%s WHERE metric_name = $1 LIMIT 1`, signozMetricDBName, signozTSTableName) + for _, name := range metricNames { + var temporality int8 err := r.db.QueryRow(ctx, query, name).Scan(&temporality) if err != nil { zap.S().Error("unexpected error", zap.Error(err)) + return nil, err } - if temporality == "Cumulative" { - metricNameToTemporality[name] = model.CUMULATIVE - } else if temporality == "Delta" { - metricNameToTemporality[name] = model.DELTA - } + metricNameToTemporality[name] = model.Temporality(temporality) } return metricNameToTemporality, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index da494716d9..354a1e13a9 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -446,19 +446,18 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http. aH.Respond(w, tagValueList) } -func (aH *APIHandler) addTemporality(ctx context.Context, qp *model.QueryRangeParamsV2) { - - metricNameToTemporality := make(map[string]model.Temporality) +func (aH *APIHandler) addTemporality(ctx context.Context, qp *model.QueryRangeParamsV2) error { + metricNames := make([]string, 0) if qp.CompositeMetricQuery != nil && len(qp.CompositeMetricQuery.BuilderQueries) > 0 { for name := range qp.CompositeMetricQuery.BuilderQueries { - mq := qp.CompositeMetricQuery.BuilderQueries[name] - metricNameToTemporality[mq.MetricName] = model.CUMULATIVE + metricNames = append(metricNames, qp.CompositeMetricQuery.BuilderQueries[name].MetricName) } } - - metricNameToTemporality, _ = aH.reader.FetchTemporality(ctx, metricNameToTemporality) - fmt.Println(metricNameToTemporality) + metricNameToTemporality, err := aH.reader.FetchTemporality(ctx, metricNames) + if err != nil { + return err + } if qp.CompositeMetricQuery != nil && len(qp.CompositeMetricQuery.BuilderQueries) > 0 { for name := range qp.CompositeMetricQuery.BuilderQueries { @@ -466,7 +465,7 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *model.QueryRangePa mq.Temporaltiy = metricNameToTemporality[mq.MetricName] } } - + return nil } func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { @@ -493,7 +492,12 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request // add temporality for each metric - aH.addTemporality(r.Context(), metricsQueryRangeParams) + temporalityErr := aH.addTemporality(r.Context(), metricsQueryRangeParams) + if temporalityErr != nil { + zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) + return + } type channelResult struct { Series []*model.Series diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go index ce794846d4..9c818a96a7 100644 --- a/pkg/query-service/app/metrics/query_builder.go +++ b/pkg/query-service/app/metrics/query_builder.go @@ -48,8 +48,8 @@ var ( // See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056 rateWithoutNegativeCumulative = `if (runningDifference(value) < 0 OR runningDifference(ts) < 0, nan, runningDifference(value)/runningDifference(ts))` rateWithoutNegativeDelta = `if (value < 0 OR runningDifference(ts) < 0, nan, value/runningDifference(ts))` - queryCumulative = `SELECT %s ts, %s as value FROM(%s)` - queryDelta = `SELECT %s ts, %s as value FROM(%s)` + queryCumulative = `SELECT %s ts, ` + rateWithoutNegativeCumulative + ` as value FROM(%s)` + queryDelta = `SELECT %s ts, ` + rateWithoutNegativeDelta + ` as value FROM(%s)` opCumulative = `max(value)` opDelta = `sum(value)` ) @@ -202,11 +202,11 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table groupTags := groupSelect(mq.GroupingTags...) var rateQuery, rateOp string - if mq.Temporaltiy == model.CUMULATIVE { - rateQuery = fmt.Sprintf(queryCumulative, rateWithoutNegativeCumulative) + if mq.Temporaltiy == model.Cumulative { + rateQuery = queryCumulative rateOp = opCumulative } else { - rateQuery = fmt.Sprintf(queryDelta, rateWithoutNegativeDelta) + rateQuery = queryDelta rateOp = opDelta } diff --git a/pkg/query-service/app/metrics/query_builder_test.go b/pkg/query-service/app/metrics/query_builder_test.go index c749224689..5d42189201 100644 --- a/pkg/query-service/app/metrics/query_builder_test.go +++ b/pkg/query-service/app/metrics/query_builder_test.go @@ -21,6 +21,7 @@ func TestBuildQuery(t *testing.T) { MetricName: "name", AggregateOperator: model.RATE_MAX, Expression: "A", + Temporaltiy: model.Cumulative, }, }, }, @@ -28,7 +29,7 @@ func TestBuildQuery(t *testing.T) { queries := PrepareBuilderMetricQueries(q, "table").Queries So(len(queries), ShouldEqual, 1) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name'") - So(queries["A"], ShouldContainSubstring, rateWithoutNegative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) }) Convey("TestSimpleQueryWithHistQuantile", t, func() { @@ -43,6 +44,7 @@ func TestBuildQuery(t *testing.T) { MetricName: "name", AggregateOperator: model.HIST_QUANTILE_99, Expression: "A", + Temporaltiy: model.Cumulative, }, }, }, @@ -50,7 +52,7 @@ func TestBuildQuery(t *testing.T) { queries := PrepareBuilderMetricQueries(q, "table").Queries So(len(queries), ShouldEqual, 1) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name'") - So(queries["A"], ShouldContainSubstring, rateWithoutNegative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) So(queries["A"], ShouldContainSubstring, "HAVING isNaN(value) = 0") }) } @@ -72,6 +74,7 @@ func TestBuildQueryWithFilters(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", + Temporaltiy: model.Cumulative, }, }, }, @@ -80,7 +83,7 @@ func TestBuildQueryWithFilters(t *testing.T) { So(len(queries), ShouldEqual, 1) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'") - So(queries["A"], ShouldContainSubstring, rateWithoutNegative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) So(queries["A"], ShouldContainSubstring, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')") }) } @@ -101,12 +104,14 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { }}, AggregateOperator: model.RATE_AVG, Expression: "A", + Temporaltiy: model.Cumulative, }, "B": { QueryName: "B", MetricName: "name2", AggregateOperator: model.RATE_MAX, Expression: "B", + Temporaltiy: model.Cumulative, }, }, }, @@ -114,7 +119,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { queries := PrepareBuilderMetricQueries(q, "table").Queries So(len(queries), ShouldEqual, 2) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") - So(queries["A"], ShouldContainSubstring, rateWithoutNegative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) }) } @@ -134,11 +139,13 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", + Temporaltiy: model.Cumulative, }, "B": { MetricName: "name2", AggregateOperator: model.RATE_AVG, Expression: "B", + Temporaltiy: model.Cumulative, }, "C": { QueryName: "C", @@ -151,7 +158,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { So(len(queries), ShouldEqual, 3) So(queries["C"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / B.value") So(queries["C"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") - So(queries["C"], ShouldContainSubstring, rateWithoutNegative) + So(queries["C"], ShouldContainSubstring, rateWithoutNegativeCumulative) }) } @@ -171,6 +178,7 @@ func TestBuildQueryWithIncorrectQueryRef(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", + Temporaltiy: model.Cumulative, }, "C": { QueryName: "C", @@ -201,18 +209,21 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", + Temporaltiy: model.Cumulative, Disabled: true, }, "B": { MetricName: "name2", AggregateOperator: model.RATE_AVG, Expression: "B", + Temporaltiy: model.Cumulative, Disabled: true, }, "C": { MetricName: "name3", AggregateOperator: model.SUM_RATE, Expression: "C", + Temporaltiy: model.Cumulative, Disabled: true, }, "F1": { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 05baa5c043..e46800df63 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -52,7 +52,7 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) - FetchTemporality(ctx context.Context, metricNameToTemporality map[string]model.Temporality) (map[string]model.Temporality, error) + FetchTemporality(ctx context.Context, metricNames []string) (map[string]model.Temporality, error) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 4dd517c71b..d967dc5c5a 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -18,12 +18,12 @@ type QueryRangeParams struct { Stats string } -type Temporality int +type Temporality int8 const ( - _ Temporality = iota - DELTA - CUMULATIVE + Unspecified Temporality = iota + Delta + Cumulative ) type MetricQuery struct { From b0ca2056457118b668c4206f11bd6f9fc5f05555 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 26 Mar 2023 00:10:55 +0530 Subject: [PATCH 04/10] Revert "Revert "upgraded some deprecated packages (#2423)"" This reverts commit d88f3594a62582a16fa04528a56ff075f628bf4e. --- .../container/ConfigDropdown/Config/styles.ts | 17 ---- .../src/container/ConfigDropdown/index.tsx | 23 ++--- .../Header/{SignedInAs => SignedIn}/index.tsx | 21 +++-- frontend/src/container/Header/index.tsx | 84 ++++++++++++------- .../src/container/ListOfDashboard/index.tsx | 23 ++++- .../src/container/LogsSearchFilter/index.tsx | 13 ++- 6 files changed, 105 insertions(+), 76 deletions(-) delete mode 100644 frontend/src/container/ConfigDropdown/Config/styles.ts rename frontend/src/container/Header/{SignedInAs => SignedIn}/index.tsx (69%) diff --git a/frontend/src/container/ConfigDropdown/Config/styles.ts b/frontend/src/container/ConfigDropdown/Config/styles.ts deleted file mode 100644 index 4807ea77c2..0000000000 --- a/frontend/src/container/ConfigDropdown/Config/styles.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Menu } from 'antd'; -import styled from 'styled-components'; - -export const MenuDropdown = styled(Menu)` - &&& { - .ant-dropdown, - .ant-dropdown-menu, - .ant-dropdown-menu-item { - padding: 0px; - } - .ant-menu-item { - height: 1.75rem; - display: flex; - align-items: center; - } - } -`; diff --git a/frontend/src/container/ConfigDropdown/index.tsx b/frontend/src/container/ConfigDropdown/index.tsx index 8390e09167..1ddd676948 100644 --- a/frontend/src/container/ConfigDropdown/index.tsx +++ b/frontend/src/container/ConfigDropdown/index.tsx @@ -13,7 +13,6 @@ import { ConfigProps } from 'types/api/dynamicConfigs/getDynamicConfigs'; import AppReducer from 'types/reducer/app'; import HelpToolTip from './Config'; -import { MenuDropdown } from './Config/styles'; function DynamicConfigDropdown({ frontendId, @@ -34,13 +33,15 @@ function DynamicConfigDropdown({ setIsHelpDropDownOpen(!isHelpDropDownOpen); }; - const menuItems = useMemo( - () => [ - { - key: '1', - label: , - }, - ], + const menu = useMemo( + () => ({ + items: [ + { + key: '1', + label: , + }, + ], + }), [config], ); @@ -53,10 +54,10 @@ function DynamicConfigDropdown({ return ( } - visible={isHelpDropDownOpen} + menu={menu} + open={isHelpDropDownOpen} > ((state) => state.app); + const onManageAccountClick = useCallback(() => { + onToggle(); + history.push(ROUTES.MY_SETTINGS); + }, [onToggle]); + if (!user) { return
; } @@ -30,11 +35,7 @@ function SignedInAS(): JSX.Element { {email}
- { - history.push(ROUTES.MY_SETTINGS); - }} - > + Manage Account @@ -42,4 +43,8 @@ function SignedInAS(): JSX.Element { ); } -export default SignedInAS; +interface SignedInProps { + onToggle: VoidFunction; +} + +export default SignedIn; diff --git a/frontend/src/container/Header/index.tsx b/frontend/src/container/Header/index.tsx index 9f04454d33..a34287e665 100644 --- a/frontend/src/container/Header/index.tsx +++ b/frontend/src/container/Header/index.tsx @@ -3,12 +3,19 @@ import { CaretUpFilled, LogoutOutlined, } from '@ant-design/icons'; -import { Divider, Dropdown, Menu, Space, Typography } from 'antd'; +import type { MenuProps } from 'antd'; +import { Divider, Dropdown, Space, Typography } from 'antd'; import { Logout } from 'api/utils'; import ROUTES from 'constants/routes'; import Config from 'container/ConfigDropdown'; import { useIsDarkMode, useThemeMode } from 'hooks/useDarkMode'; -import React, { Dispatch, SetStateAction, useCallback, useState } from 'react'; +import React, { + Dispatch, + SetStateAction, + useCallback, + useMemo, + useState, +} from 'react'; import { useSelector } from 'react-redux'; import { NavLink } from 'react-router-dom'; import { AppState } from 'store/reducers'; @@ -16,7 +23,7 @@ import AppReducer from 'types/reducer/app'; import CurrentOrganization from './CurrentOrganization'; import ManageLicense from './ManageLicense'; -import SignedInAS from './SignedInAs'; +import SignedIn from './SignedIn'; import { AvatarWrapper, Container, @@ -43,32 +50,45 @@ function HeaderContainer(): JSX.Element { [], ); - const menu = ( - - - - - - - - - - -
{ - if (e.key === 'Enter' || e.key === 'Space') { - Logout(); - } - }} - role="button" - onClick={Logout} - > - Logout -
-
-
-
+ const onLogoutKeyDown = useCallback( + (e: React.KeyboardEvent) => { + if (e.key === 'Enter' || e.key === 'Space') { + Logout(); + } + }, + [], + ); + + const menu: MenuProps = useMemo( + () => ({ + items: [ + { + key: 'main-menu', + label: ( +
+ + + + + + + + +
+ Logout +
+
+
+ ), + }, + ], + }), + [onToggleHandler, onLogoutKeyDown], ); return ( @@ -98,10 +118,10 @@ function HeaderContainer(): JSX.Element { /> {user?.name[0]} diff --git a/frontend/src/container/ListOfDashboard/index.tsx b/frontend/src/container/ListOfDashboard/index.tsx index b513df0c6a..ca5d68d9bb 100644 --- a/frontend/src/container/ListOfDashboard/index.tsx +++ b/frontend/src/container/ListOfDashboard/index.tsx @@ -1,5 +1,12 @@ import { PlusOutlined } from '@ant-design/icons'; -import { Card, Dropdown, Menu, Row, TableColumnProps, Typography } from 'antd'; +import { + Card, + Dropdown, + MenuProps, + Row, + TableColumnProps, + Typography, +} from 'antd'; import { ItemType } from 'antd/es/menu/hooks/useItems'; import createDashboard from 'api/dashboard/create'; import { AxiosError } from 'axios'; @@ -47,10 +54,12 @@ function ListOfAllDashboard(): JSX.Element { ); const { t } = useTranslation('dashboard'); + const [ isImportJSONModalVisible, setIsImportJSONModalVisible, ] = useState(false); + const [uploadedGrafana, setUploadedGrafana] = useState(false); const [filteredDashboards, setFilteredDashboards] = useState(); @@ -58,6 +67,7 @@ function ListOfAllDashboard(): JSX.Element { useEffect(() => { setFilteredDashboards(dashboards); }, [dashboards]); + const [newDashboardState, setNewDashboardState] = useState({ loading: false, error: false, @@ -215,7 +225,12 @@ function ListOfAllDashboard(): JSX.Element { return menuItems; }, [createNewDashboard, loading, onNewDashboardHandler, t]); - const menuItems = getMenuItems(); + const menu: MenuProps = useMemo( + () => ({ + items: getMenuItems(), + }), + [getMenuItems], + ); const GetHeader = useMemo( () => ( @@ -230,7 +245,7 @@ function ListOfAllDashboard(): JSX.Element { }} /> {newDashboard && ( - }> + } type="primary" @@ -249,7 +264,7 @@ function ListOfAllDashboard(): JSX.Element { newDashboard, newDashboardState.error, newDashboardState.loading, - menuItems, + menu, ], ); diff --git a/frontend/src/container/LogsSearchFilter/index.tsx b/frontend/src/container/LogsSearchFilter/index.tsx index 670c58cf70..75f95769f1 100644 --- a/frontend/src/container/LogsSearchFilter/index.tsx +++ b/frontend/src/container/LogsSearchFilter/index.tsx @@ -173,6 +173,13 @@ function SearchFilter({ globalTime.minTime, ]); + const onPopOverChange = useCallback( + (isVisible: boolean) => { + onDropDownToggleHandler(isVisible)(); + }, + [onDropDownToggleHandler], + ); + return ( { - onDropDownToggleHandler(value)(); - }} + onOpenChange={onPopOverChange} > Date: Sun, 26 Mar 2023 01:10:29 +0530 Subject: [PATCH 05/10] chore: add test --- .../app/clickhouseReader/reader.go | 6 +++- .../app/metrics/query_builder.go | 8 ++--- .../app/metrics/query_builder_test.go | 34 +++++++++++++++++++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 806ba52de0..7d9fd958ac 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3,6 +3,7 @@ package clickhouseReader import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" @@ -2166,7 +2167,6 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query } err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...) - fmt.Println(args...) zap.S().Info(query) @@ -3229,6 +3229,10 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s var temporality int8 err := r.db.QueryRow(ctx, query, name).Scan(&temporality) if err != nil { + if err == sql.ErrNoRows { + metricNameToTemporality[name] = model.Unspecified + continue + } zap.S().Error("unexpected error", zap.Error(err)) return nil, err } diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go index 9c818a96a7..1a7e7ae9ad 100644 --- a/pkg/query-service/app/metrics/query_builder.go +++ b/pkg/query-service/app/metrics/query_builder.go @@ -202,12 +202,12 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table groupTags := groupSelect(mq.GroupingTags...) var rateQuery, rateOp string - if mq.Temporaltiy == model.Cumulative { - rateQuery = queryCumulative - rateOp = opCumulative - } else { + if mq.Temporaltiy == model.Delta { rateQuery = queryDelta rateOp = opDelta + } else { + rateQuery = queryCumulative + rateOp = opCumulative } switch mq.AggregateOperator { diff --git a/pkg/query-service/app/metrics/query_builder_test.go b/pkg/query-service/app/metrics/query_builder_test.go index 5d42189201..faa48d76f1 100644 --- a/pkg/query-service/app/metrics/query_builder_test.go +++ b/pkg/query-service/app/metrics/query_builder_test.go @@ -121,6 +121,40 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) }) + + // delta temporality + Convey("TestBuildQueryWithFilters", t, func() { + q := &model.QueryRangeParamsV2{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeMetricQuery: &model.CompositeMetricQuery{ + BuilderQueries: map[string]*model.MetricQuery{ + "A": { + QueryName: "A", + MetricName: "name", + TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{ + {Key: "in", Value: []interface{}{"a", "b", "c"}, Operator: "in"}, + }}, + AggregateOperator: model.RATE_AVG, + Expression: "A", + Temporaltiy: model.Delta, + }, + "B": { + QueryName: "B", + MetricName: "name2", + AggregateOperator: model.RATE_MAX, + Expression: "B", + Temporaltiy: model.Delta, + }, + }, + }, + } + queries := PrepareBuilderMetricQueries(q, "table").Queries + So(len(queries), ShouldEqual, 2) + So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") + So(queries["A"], ShouldContainSubstring, rateWithoutNegativeDelta) + }) } func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { From 10397249de3d81741ac907b038bb563790975fe7 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 10 Jul 2023 22:50:26 +0530 Subject: [PATCH 06/10] chore: add delta for query range v3 --- ee/query-service/app/api/api.go | 2 + ee/query-service/app/server.go | 2 + ee/query-service/main.go | 3 + .../app/clickhouseReader/reader.go | 29 +-- pkg/query-service/app/http_handler.go | 48 +++-- .../app/metrics/query_builder.go | 20 +- .../app/metrics/query_builder_test.go | 55 +----- pkg/query-service/app/metrics/v3/delta.go | 174 ++++++++++++++++++ .../app/metrics/v3/query_builder.go | 20 +- .../app/metrics/v3/query_builder_test.go | 7 +- pkg/query-service/app/server.go | 2 + pkg/query-service/interfaces/interface.go | 2 +- pkg/query-service/main.go | 4 + pkg/query-service/model/queryParams.go | 9 - pkg/query-service/model/v3/v3.go | 9 + 15 files changed, 279 insertions(+), 107 deletions(-) create mode 100644 pkg/query-service/app/metrics/v3/delta.go diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index df1220d80c..2eddf1d83c 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -17,6 +17,7 @@ import ( type APIHandlerOptions struct { DataConnector interfaces.DataConnector SkipConfig *basemodel.SkipConfig + PreferDelta bool AppDao dao.ModelDao RulesManager *rules.Manager FeatureFlags baseint.FeatureLookup @@ -34,6 +35,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{ Reader: opts.DataConnector, SkipConfig: opts.SkipConfig, + PerferDelta: opts.PreferDelta, AppDao: opts.AppDao, RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags}) diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index ec2895acd8..a2e86023e3 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -56,6 +56,7 @@ type ServerOptions struct { // alert specific params DisableRules bool RuleRepoURL string + PreferDelta bool } // Server runs HTTP api service @@ -170,6 +171,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { apiOpts := api.APIHandlerOptions{ DataConnector: reader, SkipConfig: skipConfig, + PreferDelta: serverOptions.PreferDelta, AppDao: modelDao, RulesManager: rm, FeatureFlags: lm, diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 67cbde2151..52ce63ba20 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -83,10 +83,12 @@ func main() { var ruleRepoURL string var enableQueryServiceLogOTLPExport bool + var preferDelta bool flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") + flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over raw metrics)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.Parse() @@ -102,6 +104,7 @@ func main() { HTTPHostPort: baseconst.HTTPHostPort, PromConfigPath: promConfigPath, SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferDelta: preferDelta, PrivateHostPort: baseconst.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index fb28e79926..5120e7dde2 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3249,23 +3249,28 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) return spansInLastHeartBeatInterval, nil } -func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]model.Temporality, error) { +func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) { - metricNameToTemporality := make(map[string]model.Temporality) + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - query := fmt.Sprintf(`SELECT CAST(temporality, 'Int8') FROM %s.%s WHERE metric_name = $1 LIMIT 1`, signozMetricDBName, signozTSTableName) - for _, name := range metricNames { - var temporality int8 - err := r.db.QueryRow(ctx, query, name).Scan(&temporality) + query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN [$1]`, signozMetricDBName, signozTSTableName) + + rows, err := r.db.Query(ctx, query, metricNames) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var metricName, temporality string + err := rows.Scan(&metricName, &temporality) if err != nil { - if err == sql.ErrNoRows { - metricNameToTemporality[name] = model.Unspecified - continue - } - zap.S().Error("unexpected error", zap.Error(err)) return nil, err } - metricNameToTemporality[name] = model.Temporality(temporality) + if _, ok := metricNameToTemporality[metricName]; !ok { + metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) + } + metricNameToTemporality[metricName][v3.Temporality(temporality)] = true } return metricNameToTemporality, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 074737282c..85ca567c6b 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -70,6 +70,7 @@ type APIHandler struct { featureFlags interfaces.FeatureLookup ready func(http.HandlerFunc) http.HandlerFunc queryBuilder *queryBuilder.QueryBuilder + preferDelta bool // SetupCompleted indicates if SigNoz is ready for general use. // at the moment, we mark the app ready when the first user @@ -83,6 +84,8 @@ type APIHandlerOpts struct { Reader interfaces.Reader SkipConfig *model.SkipConfig + + PerferDelta bool // dao layer to perform crud on app objects like dashboard, alerts etc AppDao dao.ModelDao @@ -105,6 +108,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { reader: opts.Reader, appDao: opts.AppDao, skipConfig: opts.SkipConfig, + preferDelta: opts.PerferDelta, alertManager: alertManager, ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, @@ -451,12 +455,14 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http. aH.Respond(w, tagValueList) } -func (aH *APIHandler) addTemporality(ctx context.Context, qp *model.QueryRangeParamsV2) error { +func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { metricNames := make([]string, 0) - if qp.CompositeMetricQuery != nil && len(qp.CompositeMetricQuery.BuilderQueries) > 0 { - for name := range qp.CompositeMetricQuery.BuilderQueries { - metricNames = append(metricNames, qp.CompositeMetricQuery.BuilderQueries[name].MetricName) + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for _, query := range qp.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceMetrics { + metricNames = append(metricNames, query.AggregateAttribute.Key) + } } } metricNameToTemporality, err := aH.reader.FetchTemporality(ctx, metricNames) @@ -464,10 +470,18 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *model.QueryRangePa return err } - if qp.CompositeMetricQuery != nil && len(qp.CompositeMetricQuery.BuilderQueries) > 0 { - for name := range qp.CompositeMetricQuery.BuilderQueries { - mq := qp.CompositeMetricQuery.BuilderQueries[name] - mq.Temporaltiy = metricNameToTemporality[mq.MetricName] + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for name := range qp.CompositeQuery.BuilderQueries { + query := qp.CompositeQuery.BuilderQueries[name] + if query.DataSource == v3.DataSourceMetrics { + if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + } } } return nil @@ -495,15 +509,6 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request metricsQueryRangeParams.End = (end / step * step) * 1000 } - // add temporality for each metric - - temporalityErr := aH.addTemporality(r.Context(), metricsQueryRangeParams) - if temporalityErr != nil { - zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) - RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) - return - } - type channelResult struct { Series []*model.Series Err error @@ -2804,5 +2809,14 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { return } + // add temporality for each metric + + temporalityErr := aH.addTemporality(r.Context(), queryRangeParams) + if temporalityErr != nil { + zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) + return + } + aH.queryRangeV3(r.Context(), queryRangeParams, w, r) } diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go index 597605b9e9..435e011dbd 100644 --- a/pkg/query-service/app/metrics/query_builder.go +++ b/pkg/query-service/app/metrics/query_builder.go @@ -194,32 +194,25 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table groupBy := groupBy(mq.GroupingTags...) groupTags := groupSelect(mq.GroupingTags...) - var rateQuery, rateOp string - if mq.Temporaltiy == model.Delta { - rateQuery = queryDelta - rateOp = opDelta - } else { - rateQuery = queryCumulative - rateOp = opCumulative - } - switch mq.AggregateOperator { case model.RATE: // Calculate rate of change of metric for each unique time series groupBy = "fingerprint, ts" groupTags = "fingerprint," + op := "max(value)" // max value should be the closest value for point in time subQuery := fmt.Sprintf( - queryTmpl, "any(labels) as labels, "+groupTags, qp.Step, rateOp, filterSubQuery, groupBy, groupTags, + queryTmpl, "any(labels) as labels, "+groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags, ) // labels will be same so any should be fine query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0` - query := fmt.Sprintf(rateQuery, "labels as fullLabels,", subQuery) + query = fmt.Sprintf(query, "labels as fullLabels,", subQuery) return query, nil case model.SUM_RATE: rateGroupBy := "fingerprint, " + groupBy rateGroupTags := "fingerprint, " + groupTags + op := "max(value)" subQuery := fmt.Sprintf( - queryTmpl, rateGroupTags, qp.Step, rateOp, filterSubQuery, rateGroupBy, rateGroupTags, + queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags, ) // labels will be same so any should be fine query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` query = fmt.Sprintf(query, groupTags, subQuery) @@ -238,8 +231,9 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table case model.HIST_QUANTILE_50, model.HIST_QUANTILE_75, model.HIST_QUANTILE_90, model.HIST_QUANTILE_95, model.HIST_QUANTILE_99: rateGroupBy := "fingerprint, " + groupBy rateGroupTags := "fingerprint, " + groupTags + op := "max(value)" subQuery := fmt.Sprintf( - queryTmpl, rateGroupTags, qp.Step, rateOp, filterSubQuery, rateGroupBy, rateGroupTags, + queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags, ) // labels will be same so any should be fine query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0` query = fmt.Sprintf(query, groupTags, subQuery) diff --git a/pkg/query-service/app/metrics/query_builder_test.go b/pkg/query-service/app/metrics/query_builder_test.go index faa48d76f1..c749224689 100644 --- a/pkg/query-service/app/metrics/query_builder_test.go +++ b/pkg/query-service/app/metrics/query_builder_test.go @@ -21,7 +21,6 @@ func TestBuildQuery(t *testing.T) { MetricName: "name", AggregateOperator: model.RATE_MAX, Expression: "A", - Temporaltiy: model.Cumulative, }, }, }, @@ -29,7 +28,7 @@ func TestBuildQuery(t *testing.T) { queries := PrepareBuilderMetricQueries(q, "table").Queries So(len(queries), ShouldEqual, 1) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name'") - So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegative) }) Convey("TestSimpleQueryWithHistQuantile", t, func() { @@ -44,7 +43,6 @@ func TestBuildQuery(t *testing.T) { MetricName: "name", AggregateOperator: model.HIST_QUANTILE_99, Expression: "A", - Temporaltiy: model.Cumulative, }, }, }, @@ -52,7 +50,7 @@ func TestBuildQuery(t *testing.T) { queries := PrepareBuilderMetricQueries(q, "table").Queries So(len(queries), ShouldEqual, 1) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name'") - So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegative) So(queries["A"], ShouldContainSubstring, "HAVING isNaN(value) = 0") }) } @@ -74,7 +72,6 @@ func TestBuildQueryWithFilters(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", - Temporaltiy: model.Cumulative, }, }, }, @@ -83,7 +80,7 @@ func TestBuildQueryWithFilters(t *testing.T) { So(len(queries), ShouldEqual, 1) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'") - So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) + So(queries["A"], ShouldContainSubstring, rateWithoutNegative) So(queries["A"], ShouldContainSubstring, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')") }) } @@ -104,14 +101,12 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { }}, AggregateOperator: model.RATE_AVG, Expression: "A", - Temporaltiy: model.Cumulative, }, "B": { QueryName: "B", MetricName: "name2", AggregateOperator: model.RATE_MAX, Expression: "B", - Temporaltiy: model.Cumulative, }, }, }, @@ -119,41 +114,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { queries := PrepareBuilderMetricQueries(q, "table").Queries So(len(queries), ShouldEqual, 2) So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") - So(queries["A"], ShouldContainSubstring, rateWithoutNegativeCumulative) - }) - - // delta temporality - Convey("TestBuildQueryWithFilters", t, func() { - q := &model.QueryRangeParamsV2{ - Start: 1650991982000, - End: 1651078382000, - Step: 60, - CompositeMetricQuery: &model.CompositeMetricQuery{ - BuilderQueries: map[string]*model.MetricQuery{ - "A": { - QueryName: "A", - MetricName: "name", - TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{ - {Key: "in", Value: []interface{}{"a", "b", "c"}, Operator: "in"}, - }}, - AggregateOperator: model.RATE_AVG, - Expression: "A", - Temporaltiy: model.Delta, - }, - "B": { - QueryName: "B", - MetricName: "name2", - AggregateOperator: model.RATE_MAX, - Expression: "B", - Temporaltiy: model.Delta, - }, - }, - }, - } - queries := PrepareBuilderMetricQueries(q, "table").Queries - So(len(queries), ShouldEqual, 2) - So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") - So(queries["A"], ShouldContainSubstring, rateWithoutNegativeDelta) + So(queries["A"], ShouldContainSubstring, rateWithoutNegative) }) } @@ -173,13 +134,11 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", - Temporaltiy: model.Cumulative, }, "B": { MetricName: "name2", AggregateOperator: model.RATE_AVG, Expression: "B", - Temporaltiy: model.Cumulative, }, "C": { QueryName: "C", @@ -192,7 +151,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { So(len(queries), ShouldEqual, 3) So(queries["C"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / B.value") So(queries["C"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") - So(queries["C"], ShouldContainSubstring, rateWithoutNegativeCumulative) + So(queries["C"], ShouldContainSubstring, rateWithoutNegative) }) } @@ -212,7 +171,6 @@ func TestBuildQueryWithIncorrectQueryRef(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", - Temporaltiy: model.Cumulative, }, "C": { QueryName: "C", @@ -243,21 +201,18 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) { }}, AggregateOperator: model.RATE_MAX, Expression: "A", - Temporaltiy: model.Cumulative, Disabled: true, }, "B": { MetricName: "name2", AggregateOperator: model.RATE_AVG, Expression: "B", - Temporaltiy: model.Cumulative, Disabled: true, }, "C": { MetricName: "name3", AggregateOperator: model.SUM_RATE, Expression: "C", - Temporaltiy: model.Cumulative, Disabled: true, }, "F1": { diff --git a/pkg/query-service/app/metrics/v3/delta.go b/pkg/query-service/app/metrics/v3/delta.go new file mode 100644 index 0000000000..ba3d2783ce --- /dev/null +++ b/pkg/query-service/app/metrics/v3/delta.go @@ -0,0 +1,174 @@ +package v3 + +import ( + "fmt" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + metricQueryGroupBy := mq.GroupBy + + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + metricQueryGroupBy = append( + metricQueryGroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + + if mq.Filters != nil { + mq.Filters.Items = append(mq.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: "__temporality__"}, + Operator: v3.FilterOperatorEqual, + Value: "Delta", + }) + } + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + groupByWithoutLe := groupBy(tagsWithoutLe...) + groupTagsWithoutLe := groupSelect(tagsWithoutLe...) + orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) + groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + if len(orderWithoutLe) != 0 { + orderWithoutLe += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + // Calculate rate of change of metric for each unique time series + groupBy = "fingerprint, ts" + orderBy = "fingerprint, " + groupTags = "fingerprint," + op := fmt.Sprintf("sum(value)/%d", step) + query := fmt.Sprintf( + queryTmpl, "any(labels) as fullLabels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + + return query, nil + case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + op := fmt.Sprintf("sum(value)/%d", step) + query := fmt.Sprintf( + queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + queryTmpl := + "SELECT fingerprint, labels as fullLabels," + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " any(value) as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY fingerprint, labels, ts" + + " ORDER BY fingerprint, labels, ts" + query := fmt.Sprintf(queryTmpl, step, filterSubQuery) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index f7e3956cca..9511ca6424 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -48,9 +48,15 @@ var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference // buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering // timeseries based on search criteria -func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, metricName string, aggregateOperator v3.AggregateOperator) (string, error) { +func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) { + metricName := mq.AggregateAttribute.Key + aggregateOperator := mq.AggregateOperator var conditions []string - conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName))) + if mq.Temporality == v3.Delta { + conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta)) + } else { + conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName))) + } if fs != nil && len(fs.Items) != 0 { for _, item := range fs.Items { @@ -157,7 +163,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str } } - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq.AggregateAttribute.Key, mq.AggregateOperator) + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) if err != nil { return "", err } @@ -396,7 +402,13 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v } func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) { - query, err := buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + var query string + var err error + if mq.Temporality == v3.Delta { + query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } else { + query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } if err != nil { return "", err } diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index 7319236254..2c9a159413 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -228,7 +228,12 @@ func TestBuildQueryOperators(t *testing.T) { for i, tc := range testCases { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, "signoz_calls_total", "sum") + mq := v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"}, + AggregateOperator: v3.AggregateOperatorSum, + } + whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, &mq) require.NoError(t, err) require.Contains(t, whereClause, tc.expectedWhereClause) }) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 293d3f8753..7e3a913fac 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -48,6 +48,7 @@ type ServerOptions struct { // alert specific params DisableRules bool RuleRepoURL string + PreferDelta bool } // Server runs HTTP, Mux and a grpc server @@ -125,6 +126,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { apiHandler, err := NewAPIHandler(APIHandlerOpts{ Reader: reader, SkipConfig: skipConfig, + PerferDelta: serverOptions.PreferDelta, AppDao: dao.DB(), RuleManager: rm, FeatureFlags: fm, diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index e9f1688450..638de6e4d8 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -56,7 +56,7 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) - FetchTemporality(ctx context.Context, metricNames []string) (map[string]model.Temporality, error) + FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 9d769a0940..7e8b8f4bf6 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -34,9 +34,12 @@ func main() { // the url used to build link in the alert messages in slack and other systems var ruleRepoURL string + var preferDelta bool + flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") + flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over gauge)") flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.Parse() @@ -51,6 +54,7 @@ func main() { HTTPHostPort: constants.HTTPHostPort, PromConfigPath: promConfigPath, SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferDelta: preferDelta, PrivateHostPort: constants.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 312acae760..429471616f 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -18,18 +18,9 @@ type QueryRangeParams struct { Stats string } -type Temporality int8 - -const ( - Unspecified Temporality = iota - Delta - Cumulative -) - type MetricQuery struct { QueryName string `json:"queryName"` MetricName string `json:"metricName"` - Temporaltiy Temporality `json:"temporality"` TagFilters *FilterSet `json:"tagFilters,omitempty"` GroupingTags []string `json:"groupBy,omitempty"` AggregateOperator AggregateOperator `json:"aggregateOperator"` diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 57f290f133..3b5fa37462 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -417,12 +417,21 @@ func (c *CompositeQuery) Validate() error { return nil } +type Temporality string + +const ( + Unspecified Temporality = "Unspecified" + Delta Temporality = "Delta" + Cumulative Temporality = "Cumulative" +) + type BuilderQuery struct { QueryName string `json:"queryName"` StepInterval int64 `json:"stepInterval"` DataSource DataSource `json:"dataSource"` AggregateOperator AggregateOperator `json:"aggregateOperator"` AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"` + Temporality Temporality `json:"temporality,omitempty"` Filters *FilterSet `json:"filters,omitempty"` GroupBy []AttributeKey `json:"groupBy,omitempty"` Expression string `json:"expression"` From 4868addaa1c3e02d071093f5b66b4634b8dcd695 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 12 Jul 2023 09:15:06 +0530 Subject: [PATCH 07/10] feat: table view support for cumulative & delta metrics --- .../app/metrics/v3/cumulative_table.go | 175 ++++++++++++++++++ .../app/metrics/v3/delta_table.go | 157 ++++++++++++++++ .../app/metrics/v3/query_builder.go | 12 +- 3 files changed, 342 insertions(+), 2 deletions(-) create mode 100644 pkg/query-service/app/metrics/v3/cumulative_table.go create mode 100644 pkg/query-service/app/metrics/v3/delta_table.go diff --git a/pkg/query-service/app/metrics/v3/cumulative_table.go b/pkg/query-service/app/metrics/v3/cumulative_table.go new file mode 100644 index 0000000000..2a4b5b0e93 --- /dev/null +++ b/pkg/query-service/app/metrics/v3/cumulative_table.go @@ -0,0 +1,175 @@ +package v3 + +import ( + "fmt" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func buildMetricQueryForTable(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + points := ((end - start + 1) / 1000) / step + + metricQueryGroupBy := mq.GroupBy + + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + metricQueryGroupBy = append( + metricQueryGroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmplCounterInner := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfHour(now()) as ts," + // hack to make the join work + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + // orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupByWithoutLe := groupBy(tagsWithoutLe...) + groupTagsWithoutLe := groupSelect(tagsWithoutLe...) + orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) + groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + if len(orderWithoutLe) != 0 { + orderWithoutLe += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + return "", fmt.Errorf("rate is not supported for table view") + case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + rateOrderBy := "fingerprint, " + orderBy + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, %s(value)/%d as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], points, query, groupBy, orderBy) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + step = ((end - start + 1) / 1000) / 2 + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + subQuery := fmt.Sprintf(queryTmplCounterInner, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := `SELECT %s toStartOfHour(now()) as ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` + query = fmt.Sprintf(query, groupTags, subQuery) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + rateOrderBy := "fingerprint, " + orderBy + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, sum(value)/%d as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, points, query, groupBy, orderBy) + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + return "", fmt.Errorf("noop is not supported for table view") + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} diff --git a/pkg/query-service/app/metrics/v3/delta_table.go b/pkg/query-service/app/metrics/v3/delta_table.go new file mode 100644 index 0000000000..53a26efe53 --- /dev/null +++ b/pkg/query-service/app/metrics/v3/delta_table.go @@ -0,0 +1,157 @@ +package v3 + +import ( + "fmt" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + step := ((end - start + 1) / 1000) + + metricQueryGroupBy := mq.GroupBy + + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + metricQueryGroupBy = append( + metricQueryGroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + + // Additional filter for delta + if mq.Filters != nil { + mq.Filters.Items = append(mq.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: "__temporality__"}, + Operator: v3.FilterOperatorEqual, + Value: "Delta", + }) + } + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s toStartOfHour(now()) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupByWithoutLeTable := groupBy(tagsWithoutLe...) + groupTagsWithoutLeTable := groupSelect(tagsWithoutLe...) + orderWithoutLeTable := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) + groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + if len(orderWithoutLe) != 0 { + orderWithoutLe += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + return "", fmt.Errorf("rate is not supported for table view") + case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + op := fmt.Sprintf("sum(value)/%d", step) + query := fmt.Sprintf( + queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupTagsWithoutLeTable, value, query, groupByWithoutLeTable, orderWithoutLeTable) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + return "", fmt.Errorf("noop is not supported for table view") + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index 9511ca6424..58bf3a61bc 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -405,9 +405,17 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P var query string var err error if mq.Temporality == v3.Delta { - query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + if panelType == v3.PanelTypeTable { + query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } else { + query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } } else { - query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + if panelType == v3.PanelTypeTable { + query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } else { + query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } } if err != nil { return "", err From 9b22de63a17d704677521621c74a4dc0113cf9fd Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 13 Jul 2023 20:21:29 +0530 Subject: [PATCH 08/10] chore: update tests --- .../app/metrics/v3/cumulative_table.go | 22 ++++- .../app/metrics/v3/cumulative_table_test.go | 99 +++++++++++++++++++ .../app/metrics/v3/delta_table.go | 25 ++--- .../app/metrics/v3/delta_table_test.go | 99 +++++++++++++++++++ 4 files changed, 226 insertions(+), 19 deletions(-) create mode 100644 pkg/query-service/app/metrics/v3/cumulative_table_test.go create mode 100644 pkg/query-service/app/metrics/v3/delta_table_test.go diff --git a/pkg/query-service/app/metrics/v3/cumulative_table.go b/pkg/query-service/app/metrics/v3/cumulative_table.go index 2a4b5b0e93..22b9853168 100644 --- a/pkg/query-service/app/metrics/v3/cumulative_table.go +++ b/pkg/query-service/app/metrics/v3/cumulative_table.go @@ -2,13 +2,31 @@ package v3 import ( "fmt" + "math" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" ) -func buildMetricQueryForTable(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { +// This logic is little convoluted for a reason. +// When we work with cumulative metrics, the table view need to show the data for the entire time range. +// In some cases, we could take the points at the start and end of the time range and divide it by the +// duration. But, the problem is there is no guarantee that the trend will be linear between the start and end. +// We can sum the rate of change for some interval X, this interval can be step size of time series. +// However, the speed of query depends on the number of timestamps, so we bump up the 5x the step size. +// This should be a good balance between speed and accuracy. +// TODO: find a better way to do this +func stepForTableCumulative(start, end int64) int64 { + // round up to the nearest multiple of 60 + duration := (end - start + 1) / 1000 + step := math.Max(math.Floor(float64(duration)/150), 60) // assuming 150 max points + return int64(step) * 5 +} + +func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + step := stepForTableCumulative(start, end) points := ((end - start + 1) / 1000) / step @@ -64,7 +82,7 @@ func buildMetricQueryForTable(start, end, step int64, mq *v3.BuilderQuery, table // Select the aggregate value for interval queryTmpl := "SELECT %s" + - " toStartOfHour(now()) as ts," + // hack to make the join work + " toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts " %s as value" + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + " GLOBAL INNER JOIN" + diff --git a/pkg/query-service/app/metrics/v3/cumulative_table_test.go b/pkg/query-service/app/metrics/v3/cumulative_table_test.go new file mode 100644 index 0000000000..a9c14ca5c6 --- /dev/null +++ b/pkg/query-service/app/metrics/v3/cumulative_table_test.go @@ -0,0 +1,99 @@ +package v3 + +import ( + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPanelTableForCumulative(t *testing.T) { + cases := []struct { + name string + query *v3.BuilderQuery + expected string + }{ + { + name: "request rate", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_count", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + }, + }, + Expression: "A", + }, + expected: "SELECT toStartOfHour(now()) as ts, sum(value)/5 as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 300 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts", + }, + { + name: "latency p50", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant50, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorEqual, + Value: "frontend", + }, + }, + }, + Expression: "A", + }, + expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/5 as value FROM (SELECT le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 300 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WHERE isNaN(value) = 0) GROUP BY le,ts HAVING isNaN(value) = 0 ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + }, + { + name: "latency p99 with group by", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant99, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + }, + }, + Expression: "A", + }, + expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/5 as value FROM (SELECT service_name,le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 300 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WHERE isNaN(value) = 0) GROUP BY service_name,le,ts HAVING isNaN(value) = 0 ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if query != c.expected { + t.Fatalf("expected: %s, got: %s", c.expected, query) + } + }) + } +} diff --git a/pkg/query-service/app/metrics/v3/delta_table.go b/pkg/query-service/app/metrics/v3/delta_table.go index 53a26efe53..63cbaf72a2 100644 --- a/pkg/query-service/app/metrics/v3/delta_table.go +++ b/pkg/query-service/app/metrics/v3/delta_table.go @@ -2,6 +2,7 @@ package v3 import ( "fmt" + "math" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -10,7 +11,8 @@ import ( func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { - step := ((end - start + 1) / 1000) + // round up to the nearest multiple of 60 + step := int64(math.Ceil(float64(end-start+1)/1000/60) * 60) metricQueryGroupBy := mq.GroupBy @@ -41,15 +43,6 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab } } - // Additional filter for delta - if mq.Filters != nil { - mq.Filters.Items = append(mq.Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{Key: "__temporality__"}, - Operator: v3.FilterOperatorEqual, - Value: "Delta", - }) - } - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) if err != nil { return "", err @@ -57,9 +50,8 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) - // Select the aggregate value for interval queryTmpl := - "SELECT %s toStartOfHour(now()) as ts," + + "SELECT %s toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts " %s as value" + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + " GLOBAL INNER JOIN" + @@ -79,8 +71,6 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab } } - orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) - groupByWithoutLeTable := groupBy(tagsWithoutLe...) groupTagsWithoutLeTable := groupSelect(tagsWithoutLe...) orderWithoutLeTable := orderBy(mq.OrderBy, tagsWithoutLe) @@ -92,12 +82,13 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab if len(orderBy) != 0 { orderBy += "," } - if len(orderWithoutLe) != 0 { - orderWithoutLe += "," + if len(orderWithoutLeTable) != 0 { + orderWithoutLeTable += "," } switch mq.AggregateOperator { case v3.AggregateOperatorRate: + // TODO(srikanthccv): what should be the expected behavior here for metrics? return "", fmt.Errorf("rate is not supported for table view") case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) @@ -135,7 +126,7 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab ) // labels will be same so any should be fine value := aggregateOperatorToPercentile[mq.AggregateOperator] - query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupTagsWithoutLeTable, value, query, groupByWithoutLeTable, orderWithoutLeTable) + query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLeTable, value, query, groupByWithoutLeTable, orderWithoutLeTable) return query, nil case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) diff --git a/pkg/query-service/app/metrics/v3/delta_table_test.go b/pkg/query-service/app/metrics/v3/delta_table_test.go new file mode 100644 index 0000000000..5156c0b71d --- /dev/null +++ b/pkg/query-service/app/metrics/v3/delta_table_test.go @@ -0,0 +1,99 @@ +package v3 + +import ( + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPanelTableForDelta(t *testing.T) { + cases := []struct { + name string + query *v3.BuilderQuery + expected string + }{ + { + name: "request rate", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_count", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + }, + }, + Expression: "A", + }, + expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts", + }, + { + name: "latency p50", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant50, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorEqual, + Value: "frontend", + }, + }, + }, + Expression: "A", + }, + expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + }, + { + name: "latency p99 with group by", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant99, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + }, + }, + Expression: "A", + }, + expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if query != c.expected { + t.Fatalf("expected: %s, got: %s", c.expected, query) + } + }) + } +} From 495692c72530d1a239b67892df88d156c6a1023b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 14 Jul 2023 14:27:03 +0530 Subject: [PATCH 09/10] chore: adjust step --- pkg/query-service/app/metrics/v3/cumulative_table.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/query-service/app/metrics/v3/cumulative_table.go b/pkg/query-service/app/metrics/v3/cumulative_table.go index 22b9853168..fbd5c27447 100644 --- a/pkg/query-service/app/metrics/v3/cumulative_table.go +++ b/pkg/query-service/app/metrics/v3/cumulative_table.go @@ -14,14 +14,17 @@ import ( // In some cases, we could take the points at the start and end of the time range and divide it by the // duration. But, the problem is there is no guarantee that the trend will be linear between the start and end. // We can sum the rate of change for some interval X, this interval can be step size of time series. -// However, the speed of query depends on the number of timestamps, so we bump up the 5x the step size. +// However, the speed of query depends on the number of timestamps, so we bump up the xx the step size. // This should be a good balance between speed and accuracy. // TODO: find a better way to do this func stepForTableCumulative(start, end int64) int64 { // round up to the nearest multiple of 60 duration := (end - start + 1) / 1000 - step := math.Max(math.Floor(float64(duration)/150), 60) // assuming 150 max points - return int64(step) * 5 + step := math.Max(math.Floor(float64(duration)/120), 60) // assuming 120 max points + if duration > 1800 { // bump for longer duration + step = step * 5 + } + return int64(step) } func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { From 457ada91b086d326ae2a99cb95aa24b5f5744d94 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 14 Jul 2023 14:33:43 +0530 Subject: [PATCH 10/10] chore: fix tests --- pkg/query-service/app/metrics/v3/cumulative_table_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/query-service/app/metrics/v3/cumulative_table_test.go b/pkg/query-service/app/metrics/v3/cumulative_table_test.go index a9c14ca5c6..6c79c70bde 100644 --- a/pkg/query-service/app/metrics/v3/cumulative_table_test.go +++ b/pkg/query-service/app/metrics/v3/cumulative_table_test.go @@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) { }, Expression: "A", }, - expected: "SELECT toStartOfHour(now()) as ts, sum(value)/5 as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 300 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts", }, { name: "latency p50", @@ -61,7 +61,7 @@ func TestPanelTableForCumulative(t *testing.T) { }, Expression: "A", }, - expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/5 as value FROM (SELECT le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 300 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WHERE isNaN(value) = 0) GROUP BY le,ts HAVING isNaN(value) = 0 ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WHERE isNaN(value) = 0) GROUP BY le,ts HAVING isNaN(value) = 0 ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", }, { name: "latency p99 with group by", @@ -80,7 +80,7 @@ func TestPanelTableForCumulative(t *testing.T) { }, Expression: "A", }, - expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/5 as value FROM (SELECT service_name,le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 300 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WHERE isNaN(value) = 0) GROUP BY service_name,le,ts HAVING isNaN(value) = 0 ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT service_name,le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WHERE isNaN(value) = 0) GROUP BY service_name,le,ts HAVING isNaN(value) = 0 ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", }, }