From 305f0ebf0c161703d9cc779a8af9df3036a26d3b Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Fri, 4 Aug 2023 15:45:06 +0100 Subject: [PATCH] Collect query stats and use it in query/cost --- docs/changelog.md | 8 ++ docs/checks/query/cost.md | 68 ++++++++- internal/checks/base_test.go | 16 ++- internal/checks/query_cost.go | 53 ++++++- internal/checks/query_cost_test.go | 224 +++++++++++++++++++++++++++-- internal/config/cost.go | 18 ++- internal/config/cost_test.go | 18 +++ internal/config/rule.go | 3 +- internal/promapi/prometheus.go | 24 +++- internal/promapi/query.go | 47 +++++- internal/promapi/query_test.go | 52 +++++++ internal/promapi/range.go | 56 +++++++- internal/promapi/range_test.go | 86 +++++++++++ 13 files changed, 626 insertions(+), 47 deletions(-) diff --git a/docs/changelog.md b/docs/changelog.md index 4ffc449c..449a2454 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,13 @@ # Changelog +## v0.45.0 + +### Added + +- The `query/cost` check can now use Prometheus query stats to verify query + evaluation time and the number of samples used by a query. See + [query/cost](checks/query/cost.md) docs for details. + ## v0.44.2 ### Fixed diff --git a/docs/checks/query/cost.md b/docs/checks/query/cost.md index 84a4c556..de922160 100644 --- a/docs/checks/query/cost.md +++ b/docs/checks/query/cost.md @@ -9,11 +9,41 @@ grand_parent: Documentation This check is used to calculate cost of a query and optionally report an issue if that cost is too high. It will run `expr` query from every rule against selected Prometheus servers and report results. -This check can be used for both recording and alerting rules, but is most +This check can be used for both recording and alerting rules, but is mostly useful for recording rules. +## Query evaluation duration + +The total duration of a query comes from Prometheus query stats included +in the API response when `?stats=1` is passed. +When enabled pint can report if `evalTotalTime` is higher than configured limit, +which can be used either for informational purpose or to fail checks on queries +that are too expensive (depending on configured `severity`). + +## Query evaluation samples + +Similar to evaluation duration this information comes from Prometheus query stats. +There are two different stats that give us information about the number of samples +used by given query: + +- `totalQueryableSamples` - the total number of samples read during the query execution. +- `peakSamples` - the max samples kept in memory during the query execution and shows +how close the query was to reach the `--query.max-samples`` limit. + +In general higher `totalQueryableSamples` means that a query either reads a lot of +time series and/or queries a large time range, both translating into longer query +execution times. +Looking at `peakSamples` on the other hand can be useful to find queries that are +complex and perform some operation on a large number of time series, for example +when you run `max(...)` on a query that returns a huge number of results. + +## Series returned by the query + +For recording rules anything returned by the query will be saved into Prometheus +as new time series. Checking how many time series does a rule return allows us +to estimate how much extra memory will be needed. `pint` will try to estimate the number of bytes needed per single time series -and use that to estimate the amount of memory needed for all time series +and use that to estimate the amount of memory needed to store all the time series returned by given query. The `bytes per time series` number is calculated using this query: @@ -23,7 +53,7 @@ avg(avg_over_time(go_memstats_alloc_bytes[2h]) / avg_over_time(prometheus_tsdb_h Since Go uses garbage collector total Prometheus process memory will be more than the sum of all memory allocations, depending on many factors like memory pressure, -Go version, GOGC settings etc. The estimate `pint` gives you should be considered +Go version, `GOGC` settings etc. The estimate `pint` gives you should be considered `best case` scenario. ## Configuration @@ -32,8 +62,11 @@ Syntax: ```js cost { - severity = "bug|warning|info" - maxSeries = 5000 + severity = "bug|warning|info" + maxSeries = 5000 + maxPeakSamples = 10000 + maxTotalSamples = 200000 + maxEvaluationDuration = "1m" } ``` @@ -43,6 +76,15 @@ cost { report it as information. - `maxSeries` - if set and number of results for given query exceeds this value it will be reported as a bug (or custom severity if `severity` is set). +- `maxPeakSamples` - setting this to a non-zero value will tell pint to report + any query that has higher `peakSamples` values than the value configured here. + Nothing will be reported if this option is not set. +- `maxTotalSamples` - setting this to a non-zero value will tell pint to report + any query that has higher `totalQueryableSamples` values than the value + configured here. Nothing will be reported if this option is not set. +- `maxEvaluationDuration` - setting this to a non-zero value will tell pint to + report any query that has higher `evalTotalTime` values than the value + configured here. Nothing will be reported if this option is not set. ## How to enable it @@ -68,6 +110,22 @@ rule { } ``` +Fail checks if any recording rule is using more than 300000 peak samples +or if it's taking more than 30 seconds to evaluate. + +```js +rule { + match { + kind = "recording" + } + cost { + maxPeakSamples = 300000 + maxEvaluationDuration = "30s" + severity = "bug" + } +} +``` + ## How to disable it You can disable this check globally by adding this config block: diff --git a/internal/checks/base_test.go b/internal/checks/base_test.go index eaadb0c4..46e1c23e 100644 --- a/internal/checks/base_test.go +++ b/internal/checks/base_test.go @@ -298,6 +298,7 @@ func (pe promError) respond(w http.ResponseWriter, _ *http.Request) { type vectorResponse struct { samples model.Vector + stats promapi.QueryStats } func (vr vectorResponse) respond(w http.ResponseWriter, _ *http.Request) { @@ -306,17 +307,20 @@ func (vr vectorResponse) respond(w http.ResponseWriter, _ *http.Request) { result := struct { Status string `json:"status"` Data struct { - ResultType string `json:"resultType"` - Result model.Vector `json:"result"` + ResultType string `json:"resultType"` + Result model.Vector `json:"result"` + Stats promapi.QueryStats `json:"stats"` } `json:"data"` }{ Status: "success", Data: struct { - ResultType string `json:"resultType"` - Result model.Vector `json:"result"` + ResultType string `json:"resultType"` + Result model.Vector `json:"result"` + Stats promapi.QueryStats `json:"stats"` }{ ResultType: "vector", Result: vr.samples, + Stats: vr.stats, }, } d, err := json.MarshalIndent(result, "", " ") @@ -328,6 +332,7 @@ func (vr vectorResponse) respond(w http.ResponseWriter, _ *http.Request) { type matrixResponse struct { samples []*model.SampleStream + stats promapi.QueryStats } func (mr matrixResponse) respond(w http.ResponseWriter, r *http.Request) { @@ -357,15 +362,18 @@ func (mr matrixResponse) respond(w http.ResponseWriter, r *http.Request) { Data struct { ResultType string `json:"resultType"` Result []*model.SampleStream `json:"result"` + Stats promapi.QueryStats `json:"stats"` } `json:"data"` }{ Status: "success", Data: struct { ResultType string `json:"resultType"` Result []*model.SampleStream `json:"result"` + Stats promapi.QueryStats `json:"stats"` }{ ResultType: "matrix", Result: samples, + Stats: mr.stats, }, } d, err := json.MarshalIndent(result, "", " ") diff --git a/internal/checks/query_cost.go b/internal/checks/query_cost.go index 4f7131f6..a39b967e 100644 --- a/internal/checks/query_cost.go +++ b/internal/checks/query_cost.go @@ -3,6 +3,7 @@ package checks import ( "context" "fmt" + "time" "github.com/cloudflare/pint/internal/discovery" "github.com/cloudflare/pint/internal/output" @@ -15,18 +16,24 @@ const ( BytesPerSampleQuery = "avg(avg_over_time(go_memstats_alloc_bytes[2h]) / avg_over_time(prometheus_tsdb_head_series[2h]))" ) -func NewCostCheck(prom *promapi.FailoverGroup, maxSeries int, severity Severity) CostCheck { +func NewCostCheck(prom *promapi.FailoverGroup, maxSeries, maxTotalSamples, maxPeakSamples int, maxEvaluationDuration time.Duration, severity Severity) CostCheck { return CostCheck{ - prom: prom, - maxSeries: maxSeries, - severity: severity, + prom: prom, + maxSeries: maxSeries, + maxTotalSamples: maxTotalSamples, + maxPeakSamples: maxPeakSamples, + maxEvaluationDuration: maxEvaluationDuration, + severity: severity, } } type CostCheck struct { - prom *promapi.FailoverGroup - maxSeries int - severity Severity + prom *promapi.FailoverGroup + maxSeries int + maxTotalSamples int + maxPeakSamples int + maxEvaluationDuration time.Duration + severity Severity } func (c CostCheck) Meta() CheckMeta { @@ -95,5 +102,37 @@ func (c CostCheck) Check(ctx context.Context, _ string, rule parser.Rule, _ []di Text: fmt.Sprintf("%s returned %d result(s)%s%s", promText(c.prom.Name(), qr.URI), series, estimate, above), Severity: severity, }) + + if c.maxTotalSamples > 0 && qr.Stats.Samples.TotalQueryableSamples > c.maxTotalSamples { + problems = append(problems, Problem{ + Fragment: expr.Value.Value, + Lines: expr.Lines(), + Reporter: c.Reporter(), + Text: fmt.Sprintf("%s queried %d samples in total when executing this query, which is more than the configured limit of %d", promText(c.prom.Name(), qr.URI), qr.Stats.Samples.TotalQueryableSamples, c.maxTotalSamples), + Severity: c.severity, + }) + } + + if c.maxPeakSamples > 0 && qr.Stats.Samples.PeakSamples > c.maxPeakSamples { + problems = append(problems, Problem{ + Fragment: expr.Value.Value, + Lines: expr.Lines(), + Reporter: c.Reporter(), + Text: fmt.Sprintf("%s queried %d peak samples when executing this query, which is more than the configured limit of %d", promText(c.prom.Name(), qr.URI), qr.Stats.Samples.PeakSamples, c.maxPeakSamples), + Severity: c.severity, + }) + } + + evalDur := time.Duration(qr.Stats.Timings.EvalTotalTime * float64(time.Second)) + if c.maxEvaluationDuration > 0 && evalDur > c.maxEvaluationDuration { + problems = append(problems, Problem{ + Fragment: expr.Value.Value, + Lines: expr.Lines(), + Reporter: c.Reporter(), + Text: fmt.Sprintf("%s took %s when executing this query, which is more than the configured limit of %s", promText(c.prom.Name(), qr.URI), output.HumanizeDuration(evalDur), output.HumanizeDuration(c.maxEvaluationDuration)), + Severity: c.severity, + }) + } + return problems } diff --git a/internal/checks/query_cost_test.go b/internal/checks/query_cost_test.go index 394a8f1d..04931ed6 100644 --- a/internal/checks/query_cost_test.go +++ b/internal/checks/query_cost_test.go @@ -23,6 +23,18 @@ func maxSeriesText(m int) string { return fmt.Sprintf(", maximum allowed series is %d", m) } +func evalDurText(name, uri, dur, limit string) string { + return fmt.Sprintf(`prometheus %q at %s took %s when executing this query, which is more than the configured limit of %s`, name, uri, dur, limit) +} + +func totalSamplesText(name, uri string, total, limit int) string { + return fmt.Sprintf(`prometheus %q at %s queried %d samples in total when executing this query, which is more than the configured limit of %d`, name, uri, total, limit) +} + +func peakSamplesText(name, uri string, total, limit int) string { + return fmt.Sprintf(`prometheus %q at %s queried %d peak samples when executing this query, which is more than the configured limit of %d`, name, uri, total, limit) +} + func TestCostCheck(t *testing.T) { content := "- record: foo\n expr: sum(foo)\n" @@ -31,7 +43,7 @@ func TestCostCheck(t *testing.T) { description: "ignores rules with syntax errors", content: "- record: foo\n expr: sum(foo) without(\n", checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: noProblems, @@ -40,7 +52,7 @@ func TestCostCheck(t *testing.T) { description: "empty response", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -68,7 +80,7 @@ func TestCostCheck(t *testing.T) { description: "response timeout", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: func(uri string) *promapi.FailoverGroup { return simpleProm("prom", uri, time.Millisecond*50, true) @@ -98,7 +110,7 @@ func TestCostCheck(t *testing.T) { description: "bad request", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -126,7 +138,7 @@ func TestCostCheck(t *testing.T) { description: "connection refused", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: func(s string) *promapi.FailoverGroup { return simpleProm("prom", "http://127.0.0.1:1111", time.Second*5, false) @@ -147,7 +159,7 @@ func TestCostCheck(t *testing.T) { description: "1 result", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -186,7 +198,7 @@ func TestCostCheck(t *testing.T) { description: "7 results", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -235,7 +247,7 @@ func TestCostCheck(t *testing.T) { description: "7 result with MB", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -284,7 +296,7 @@ func TestCostCheck(t *testing.T) { description: "7 results with 1 series max (1KB bps)", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 1, checks.Bug) + return checks.NewCostCheck(prom, 1, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -333,7 +345,7 @@ func TestCostCheck(t *testing.T) { description: "6 results with 5 series max", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 5, checks.Bug) + return checks.NewCostCheck(prom, 5, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -377,7 +389,7 @@ func TestCostCheck(t *testing.T) { description: "7 results with 5 series max / infi", content: content, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 5, checks.Information) + return checks.NewCostCheck(prom, 5, 0, 0, 0, checks.Information) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -425,7 +437,7 @@ func TestCostCheck(t *testing.T) { expr: 'sum({__name__="foo"})' `, checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { - return checks.NewCostCheck(prom, 0, checks.Bug) + return checks.NewCostCheck(prom, 0, 0, 0, 0, checks.Bug) }, prometheus: newSimpleProm, problems: func(uri string) []checks.Problem { @@ -470,6 +482,194 @@ func TestCostCheck(t *testing.T) { }, }, }, + { + description: "1s eval, 5s limit", + content: content, + checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { + return checks.NewCostCheck(prom, 0, 0, 0, time.Second*5, checks.Bug) + }, + prometheus: newSimpleProm, + problems: func(uri string) []checks.Problem { + return []checks.Problem{ + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: costText("prom", uri, 1) + memUsageText("4.0KiB"), + Severity: checks.Information, + }, + } + }, + mocks: []*prometheusMock{ + { + conds: []requestCondition{ + requireQueryPath, + formCond{key: "query", value: `count(sum(foo))`}, + }, + resp: vectorResponse{ + samples: []*model.Sample{generateSample(map[string]string{})}, + stats: promapi.QueryStats{ + Timings: promapi.QueryTimings{ + EvalTotalTime: 1, + }, + }, + }, + }, + { + conds: []requestCondition{ + requireQueryPath, + formCond{key: "query", value: checks.BytesPerSampleQuery}, + }, + resp: vectorResponse{ + samples: []*model.Sample{ + generateSampleWithValue(map[string]string{}, 4096), + }, + }, + }, + }, + }, + { + description: "stats", + content: content, + checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { + return checks.NewCostCheck(prom, 0, 100, 10, time.Second*5, checks.Bug) + }, + prometheus: newSimpleProm, + problems: func(uri string) []checks.Problem { + return []checks.Problem{ + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: costText("prom", uri, 1) + memUsageText("4.0KiB"), + Severity: checks.Information, + }, + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: totalSamplesText("prom", uri, 200, 100), + Severity: checks.Bug, + }, + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: peakSamplesText("prom", uri, 20, 10), + Severity: checks.Bug, + }, + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: evalDurText("prom", uri, "5s100ms", "5s"), + Severity: checks.Bug, + }, + } + }, + mocks: []*prometheusMock{ + { + conds: []requestCondition{ + requireQueryPath, + formCond{key: "query", value: `count(sum(foo))`}, + }, + resp: vectorResponse{ + samples: []*model.Sample{generateSample(map[string]string{})}, + stats: promapi.QueryStats{ + Timings: promapi.QueryTimings{ + EvalTotalTime: 5.1, + }, + Samples: promapi.QuerySamples{ + TotalQueryableSamples: 200, + PeakSamples: 20, + }, + }, + }, + }, + { + conds: []requestCondition{ + requireQueryPath, + formCond{key: "query", value: checks.BytesPerSampleQuery}, + }, + resp: vectorResponse{ + samples: []*model.Sample{ + generateSampleWithValue(map[string]string{}, 4096), + }, + }, + }, + }, + }, + { + description: "stats - info", + content: content, + checker: func(prom *promapi.FailoverGroup) checks.RuleChecker { + return checks.NewCostCheck(prom, 0, 100, 10, time.Second*5, checks.Information) + }, + prometheus: newSimpleProm, + problems: func(uri string) []checks.Problem { + return []checks.Problem{ + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: costText("prom", uri, 1) + memUsageText("4.0KiB"), + Severity: checks.Information, + }, + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: totalSamplesText("prom", uri, 200, 100), + Severity: checks.Information, + }, + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: peakSamplesText("prom", uri, 20, 10), + Severity: checks.Information, + }, + { + Fragment: "sum(foo)", + Lines: []int{2}, + Reporter: "query/cost", + Text: evalDurText("prom", uri, "5s100ms", "5s"), + Severity: checks.Information, + }, + } + }, + mocks: []*prometheusMock{ + { + conds: []requestCondition{ + requireQueryPath, + formCond{key: "query", value: `count(sum(foo))`}, + }, + resp: vectorResponse{ + samples: []*model.Sample{generateSample(map[string]string{})}, + stats: promapi.QueryStats{ + Timings: promapi.QueryTimings{ + EvalTotalTime: 5.1, + }, + Samples: promapi.QuerySamples{ + TotalQueryableSamples: 200, + PeakSamples: 20, + }, + }, + }, + }, + { + conds: []requestCondition{ + requireQueryPath, + formCond{key: "query", value: checks.BytesPerSampleQuery}, + }, + resp: vectorResponse{ + samples: []*model.Sample{ + generateSampleWithValue(map[string]string{}, 4096), + }, + }, + }, + }, + }, } runTests(t, testCases) diff --git a/internal/config/cost.go b/internal/config/cost.go index 7a74aeff..7ebce644 100644 --- a/internal/config/cost.go +++ b/internal/config/cost.go @@ -7,8 +7,11 @@ import ( ) type CostSettings struct { - MaxSeries int `hcl:"maxSeries,optional" json:"maxSeries,omitempty"` - Severity string `hcl:"severity,optional" json:"severity,omitempty"` + MaxSeries int `hcl:"maxSeries,optional" json:"maxSeries,omitempty"` + MaxPeakSamples int `hcl:"maxPeakSamples,optional" json:"maxPeakSamples,omitempty"` + MaxTotalSamples int `hcl:"maxTotalSamples,optional" json:"maxTotalSamples,omitempty"` + MaxEvaluationDuration string `hcl:"maxEvaluationDuration,optional" json:"maxEvaluationDuration,omitempty"` + Severity string `hcl:"severity,optional" json:"severity,omitempty"` } func (cs CostSettings) validate() error { @@ -20,6 +23,17 @@ func (cs CostSettings) validate() error { if cs.MaxSeries < 0 { return fmt.Errorf("maxSeries value must be >= 0") } + if cs.MaxTotalSamples < 0 { + return fmt.Errorf("maxTotalSamples value must be >= 0") + } + if cs.MaxPeakSamples < 0 { + return fmt.Errorf("maxPeakSamples value must be >= 0") + } + if cs.MaxEvaluationDuration != "" { + if _, err := parseDuration(cs.MaxEvaluationDuration); err != nil { + return err + } + } return nil } diff --git a/internal/config/cost_test.go b/internal/config/cost_test.go index 0a503bc4..1444118a 100644 --- a/internal/config/cost_test.go +++ b/internal/config/cost_test.go @@ -31,6 +31,24 @@ func TestCostSettings(t *testing.T) { }, err: errors.New("unknown severity: foo"), }, + { + conf: CostSettings{ + MaxPeakSamples: -1, + }, + err: errors.New("maxPeakSamples value must be >= 0"), + }, + { + conf: CostSettings{ + MaxTotalSamples: -1, + }, + err: errors.New("maxTotalSamples value must be >= 0"), + }, + { + conf: CostSettings{ + MaxEvaluationDuration: "1abc", + }, + err: errors.New(`unknown unit "abc" in duration "1abc"`), + }, } for _, tc := range testCases { diff --git a/internal/config/rule.go b/internal/config/rule.go index e10a22be..2a2fedad 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -137,10 +137,11 @@ func (rule Rule) resolveChecks(ctx context.Context, path string, r parser.Rule, if rule.Cost != nil { severity := rule.Cost.getSeverity(checks.Bug) + evalDur, _ := parseDuration(rule.Cost.MaxEvaluationDuration) for _, prom := range prometheusServers { enabled = append(enabled, checkMeta{ name: checks.CostCheckName, - check: checks.NewCostCheck(prom, rule.Cost.MaxSeries, severity), + check: checks.NewCostCheck(prom, rule.Cost.MaxSeries, rule.Cost.MaxTotalSamples, rule.Cost.MaxPeakSamples, evalDur, severity), tags: prom.Tags(), }) } diff --git a/internal/promapi/prometheus.go b/internal/promapi/prometheus.go index d8f858e0..d7e89aef 100644 --- a/internal/promapi/prometheus.go +++ b/internal/promapi/prometheus.go @@ -46,9 +46,29 @@ type queryRequest struct { type queryResult struct { value any + stats QueryStats err error } +type QueryTimings struct { + EvalTotalTime float64 `json:"evalTotalTime"` + ResultSortTime float64 `json:"resultSortTime"` + QueryPreparationTime float64 `json:"queryPreparationTime"` + InnerEvalTime float64 `json:"innerEvalTime"` + ExecQueueTime float64 `json:"execQueueTime"` + ExecTotalTime float64 `json:"execTotalTime"` +} + +type QuerySamples struct { + TotalQueryableSamples int `json:"totalQueryableSamples"` + PeakSamples int `json:"peakSamples"` +} + +type QueryStats struct { + Timings QueryTimings `json:"timings"` + Samples QuerySamples `json:"samples"` +} + func sanitizeURI(s string) string { u, err := url.Parse(s) if err != nil { @@ -167,7 +187,7 @@ func processJob(prom *Prometheus, job queryRequest) queryResult { cacheKey := job.query.CacheKey() if prom.cache != nil { if cached, ok := prom.cache.get(cacheKey, job.query.Endpoint()); ok { - return queryResult{value: cached} + return cached.(queryResult) } } @@ -192,7 +212,7 @@ func processJob(prom *Prometheus, job queryRequest) queryResult { } if prom.cache != nil { - prom.cache.set(cacheKey, result.value, job.query.CacheTTL()) + prom.cache.set(cacheKey, result, job.query.CacheTTL()) } return result diff --git a/internal/promapi/query.go b/internal/promapi/query.go index 8bb2a73f..b24a3459 100644 --- a/internal/promapi/query.go +++ b/internal/promapi/query.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "time" @@ -19,6 +20,7 @@ import ( type QueryResult struct { URI string Series []Sample + Stats QueryStats } type instantQuery struct { @@ -42,6 +44,7 @@ func (q instantQuery) Run() queryResult { args := url.Values{} args.Set("query", q.expr) args.Set("timeout", q.prom.timeout.String()) + args.Set("stats", "1") resp, err := q.prom.doRequest(ctx, http.MethodPost, q.Endpoint(), args) if err != nil { qr.err = err @@ -54,8 +57,7 @@ func (q instantQuery) Run() queryResult { return qr } - samples, err := streamSamples(resp.Body) - qr.value, qr.err = samples, err + qr.value, qr.stats, qr.err = streamSamples(resp.Body) return qr } @@ -96,6 +98,7 @@ func (p *Prometheus) Query(ctx context.Context, expr string) (*QueryResult, erro qr := QueryResult{ URI: p.safeURI, Series: result.value.([]Sample), + Stats: result.stats, } log.Debug().Str("uri", p.safeURI).Str("query", expr).Int("series", len(qr.Series)).Msg("Parsed response") @@ -107,7 +110,7 @@ type Sample struct { Value float64 } -func streamSamples(r io.Reader) (samples []Sample, err error) { +func streamSamples(r io.Reader) (samples []Sample, stats QueryStats, err error) { defer dummyReadAll(r) var status, resultType, errType, errText string @@ -137,21 +140,51 @@ func streamSamples(r io.Reader) (samples []Sample, err error) { sample.Metric = model.Metric{} }, )), + current.Key("stats", current.Object( + current.Key("timings", current.Object( + current.Key("evalTotalTime", current.Value(func(v float64, isNil bool) { + stats.Timings.EvalTotalTime = v + })), + current.Key("resultSortTime", current.Value(func(v float64, isNil bool) { + stats.Timings.ResultSortTime = v + })), + current.Key("queryPreparationTime", current.Value(func(v float64, isNil bool) { + stats.Timings.QueryPreparationTime = v + })), + current.Key("innerEvalTime", current.Value(func(v float64, isNil bool) { + stats.Timings.InnerEvalTime = v + })), + current.Key("execQueueTime", current.Value(func(v float64, isNil bool) { + stats.Timings.ExecQueueTime = v + })), + current.Key("execTotalTime", current.Value(func(v float64, isNil bool) { + stats.Timings.ExecTotalTime = v + })), + )), + current.Key("samples", current.Object( + current.Key("totalQueryableSamples", current.Value(func(v float64, isNil bool) { + stats.Samples.TotalQueryableSamples = int(math.Round(v)) + })), + current.Key("peakSamples", current.Value(func(v float64, isNil bool) { + stats.Samples.PeakSamples = int(math.Round(v)) + })), + )), + )), )), ) dec := json.NewDecoder(r) if err = decoder.Stream(dec); err != nil { - return nil, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("JSON parse error: %s", err)} + return nil, stats, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("JSON parse error: %s", err)} } if status != "success" { - return nil, APIError{Status: status, ErrorType: decodeErrorType(errType), Err: errText} + return nil, stats, APIError{Status: status, ErrorType: decodeErrorType(errType), Err: errText} } if resultType != "vector" { - return nil, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("invalid result type, expected vector, got %s", resultType)} + return nil, stats, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("invalid result type, expected vector, got %s", resultType)} } - return samples, nil + return samples, stats, nil } diff --git a/internal/promapi/query_test.go b/internal/promapi/query_test.go index d388d23c..9462adf8 100644 --- a/internal/promapi/query_test.go +++ b/internal/promapi/query_test.go @@ -95,6 +95,30 @@ func TestQuery(t *testing.T) { "errorType":"execution", "error":"query processing would load too many samples into memory in query execution" }`)) + case "stats": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status":"success", + "data":{ + "resultType":"vector", + "result":[{"metric":{},"value":[1614859502.068,"1"]}], + "stats": { + "timings": { + "evalTotalTime": 10.1, + "resultSortTime": 0.5, + "queryPreparationTime": 1.5, + "innerEvalTime": 0.7, + "execQueueTime": 0.01, + "execTotalTime": 5.1 + }, + "samples": { + "totalQueryableSamples": 1000, + "peakSamples": 500 + } + } + } + }`)) default: w.WriteHeader(400) w.Header().Set("Content-Type", "application/json") @@ -190,6 +214,33 @@ func TestQuery(t *testing.T) { timeout: time.Second, err: "execution: query processing would load too many samples into memory in query execution", }, + { + query: "stats", + timeout: time.Second * 5, + result: promapi.QueryResult{ + URI: srv.URL, + Series: []promapi.Sample{ + { + Labels: labels.EmptyLabels(), + Value: 1, + }, + }, + Stats: promapi.QueryStats{ + Timings: promapi.QueryTimings{ + EvalTotalTime: 10.1, + ResultSortTime: 0.5, + QueryPreparationTime: 1.5, + InnerEvalTime: 0.7, + ExecQueueTime: 0.01, + ExecTotalTime: 5.1, + }, + Samples: promapi.QuerySamples{ + TotalQueryableSamples: 1000, + PeakSamples: 500, + }, + }, + }, + }, } for _, tc := range testCases { @@ -209,6 +260,7 @@ func TestQuery(t *testing.T) { if qr != nil { require.Equal(t, tc.result.URI, qr.URI) require.Equal(t, tc.result.Series, qr.Series) + require.Equal(t, tc.result.Stats, qr.Stats) } }) } diff --git a/internal/promapi/range.go b/internal/promapi/range.go index daed00b3..194f5c61 100644 --- a/internal/promapi/range.go +++ b/internal/promapi/range.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "net/url" "sort" @@ -24,6 +25,7 @@ import ( type RangeQueryResult struct { URI string Series SeriesTimeRanges + Stats QueryStats } type rangeQuery struct { @@ -46,6 +48,7 @@ func (q rangeQuery) Run() queryResult { args.Set("end", formatTime(q.r.End)) args.Set("step", strconv.FormatFloat(q.r.Step.Seconds(), 'f', -1, 64)) args.Set("timeout", q.prom.timeout.String()) + args.Set("stats", "1") resp, err := q.prom.doRequest(ctx, http.MethodPost, q.Endpoint(), args) if err != nil { qr.err = err @@ -58,9 +61,10 @@ func (q rangeQuery) Run() queryResult { return qr } - ranges, err := streamSampleStream(resp.Body, q.r.Step) + var ranges MetricTimeRanges + ranges, qr.stats, qr.err = streamSampleStream(resp.Body, q.r.Step) ExpandRangesEnd(ranges, q.r.Step) - qr.value, qr.err = ranges, err + qr.value = ranges return qr } @@ -175,6 +179,14 @@ func (p *Prometheus) RangeQuery(ctx context.Context, expr string, params RangeQu continue } merged.Series.Ranges = append(merged.Series.Ranges, result.value.(MetricTimeRanges)...) + merged.Stats.Samples.PeakSamples += result.stats.Samples.PeakSamples + merged.Stats.Samples.TotalQueryableSamples += result.stats.Samples.TotalQueryableSamples + merged.Stats.Timings.EvalTotalTime += result.stats.Timings.EvalTotalTime + merged.Stats.Timings.ExecQueueTime += result.stats.Timings.ExecQueueTime + merged.Stats.Timings.ExecTotalTime += result.stats.Timings.ExecTotalTime + merged.Stats.Timings.InnerEvalTime += result.stats.Timings.InnerEvalTime + merged.Stats.Timings.QueryPreparationTime += result.stats.Timings.QueryPreparationTime + merged.Stats.Timings.ResultSortTime += result.stats.Timings.ResultSortTime wg.Done() } if len(merged.Series.Ranges) > 1 { @@ -293,7 +305,7 @@ func (ar AbsoluteRange) String() string { output.HumanizeDuration(ar.step)) } -func streamSampleStream(r io.Reader, step time.Duration) (dst MetricTimeRanges, err error) { +func streamSampleStream(r io.Reader, step time.Duration) (dst MetricTimeRanges, stats QueryStats, err error) { defer dummyReadAll(r) var status, errType, errText, resultType string @@ -321,21 +333,51 @@ func streamSampleStream(r io.Reader, step time.Duration) (dst MetricTimeRanges, sample.Values = make([]model.SamplePair, 0, len(sample.Values)) }, )), + current.Key("stats", current.Object( + current.Key("timings", current.Object( + current.Key("evalTotalTime", current.Value(func(v float64, isNil bool) { + stats.Timings.EvalTotalTime = v + })), + current.Key("resultSortTime", current.Value(func(v float64, isNil bool) { + stats.Timings.ResultSortTime = v + })), + current.Key("queryPreparationTime", current.Value(func(v float64, isNil bool) { + stats.Timings.QueryPreparationTime = v + })), + current.Key("innerEvalTime", current.Value(func(v float64, isNil bool) { + stats.Timings.InnerEvalTime = v + })), + current.Key("execQueueTime", current.Value(func(v float64, isNil bool) { + stats.Timings.ExecQueueTime = v + })), + current.Key("execTotalTime", current.Value(func(v float64, isNil bool) { + stats.Timings.ExecTotalTime = v + })), + )), + current.Key("samples", current.Object( + current.Key("totalQueryableSamples", current.Value(func(v float64, isNil bool) { + stats.Samples.TotalQueryableSamples = int(math.Round(v)) + })), + current.Key("peakSamples", current.Value(func(v float64, isNil bool) { + stats.Samples.PeakSamples = int(math.Round(v)) + })), + )), + )), )), ) dec := json.NewDecoder(r) if err = decoder.Stream(dec); err != nil { - return nil, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("JSON parse error: %s", err)} + return nil, stats, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("JSON parse error: %s", err)} } if status != "success" { - return nil, APIError{Status: status, ErrorType: decodeErrorType(errType), Err: errText} + return nil, stats, APIError{Status: status, ErrorType: decodeErrorType(errType), Err: errText} } if resultType != "matrix" { - return nil, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("invalid result type, expected matrix, got %s", resultType)} + return nil, stats, APIError{Status: status, ErrorType: v1.ErrBadResponse, Err: fmt.Sprintf("invalid result type, expected matrix, got %s", resultType)} } - return dst, nil + return dst, stats, nil } diff --git a/internal/promapi/range_test.go b/internal/promapi/range_test.go index 4147cfa9..71fd870d 100644 --- a/internal/promapi/range_test.go +++ b/internal/promapi/range_test.go @@ -27,6 +27,7 @@ func TestRange(t *testing.T) { step time.Duration timeout time.Duration out promapi.SeriesTimeRanges + stats promapi.QueryStats err string handler handlerFunc } @@ -543,6 +544,90 @@ func TestRange(t *testing.T) { }`)) }, }, + { + query: "stats", + start: timeParse("2022-06-14T00:00:00Z"), + end: timeParse("2022-06-14T07:00:00Z"), + step: time.Minute, + timeout: time.Second, + out: promapi.SeriesTimeRanges{ + Ranges: promapi.MetricTimeRanges{ + { + Labels: labels.FromStrings("instance", "1"), + Start: timeParse("2022-06-14T00:00:00Z"), + End: timeParse("2022-06-14T06:59:59Z"), + }, + }, + }, + stats: promapi.QueryStats{ + Timings: promapi.QueryTimings{ + EvalTotalTime: 10.1 * 4, + ResultSortTime: 0.5 * 4, + QueryPreparationTime: 1.5 * 4, + InnerEvalTime: 0.7 * 4, + ExecQueueTime: 0.01 * 4, + ExecTotalTime: 5.1 * 4, + }, + Samples: promapi.QuerySamples{ + TotalQueryableSamples: 1000 * 4, + PeakSamples: 500 * 4, + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + t.Fatal(err) + } + require.Equal(t, r.Form.Get("query"), "stats") + require.Equal(t, r.Form.Get("step"), "60") + + start, _ := strconv.ParseFloat(r.Form.Get("start"), 64) + end, _ := strconv.ParseFloat(r.Form.Get("end"), 64) + + switch start { + case float64(timeParse("2022-06-14T00:00:00Z").Unix()): + require.Equal(t, float64(timeParse("2022-06-14T01:59:59Z").Unix()), end, "invalid end for #0") + case float64(timeParse("2022-06-14T02:00:00Z").Unix()): + require.Equal(t, float64(timeParse("2022-06-14T03:59:59Z").Unix()), end, "invalid end for #1") + case float64(timeParse("2022-06-14T04:00:00Z").Unix()): + require.Equal(t, float64(timeParse("2022-06-14T05:59:59Z").Unix()), end, "invalid end for #2") + case float64(timeParse("2022-06-14T06:00:00Z").Unix()): + require.Equal(t, float64(timeParse("2022-06-14T07:00:00Z").Unix()), end, "invalid end for #3") + default: + t.Fatalf("unknown start: %.2f", start) + } + + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + var values []string + for i := start; i < end; i += 60 { + values = append(values, fmt.Sprintf(`[%3f,"1"]`, i)) + } + _, _ = w.Write([]byte(fmt.Sprintf( + `{ + "status":"success", + "data":{ + "resultType":"matrix", + "result":[{"metric":{"instance":"1"}, "values":[%s]}], + "stats": { + "timings": { + "evalTotalTime": 10.1, + "resultSortTime": 0.5, + "queryPreparationTime": 1.5, + "innerEvalTime": 0.7, + "execQueueTime": 0.01, + "execTotalTime": 5.1 + }, + "samples": { + "totalQueryableSamples": 1000, + "peakSamples": 500 + } + } + } + }`, + strings.Join(values, ",")))) + }, + }, } for _, tc := range testCases { @@ -566,6 +651,7 @@ func TestRange(t *testing.T) { } else { require.NoError(t, err) require.Equal(t, printRange(tc.out.Ranges), printRange(qr.Series.Ranges), tc) + require.Equal(t, tc.stats, qr.Stats) } }) }