Skip to content

Commit

Permalink
Merge pull request #842 from go-graphite/njpm/mdp-aggregation-nudging…
Browse files Browse the repository at this point in the history
…-upstream

MaxDataPoints consolidation improvements: support nudging for consistent bucketing
  • Loading branch information
Civil authored Sep 12, 2024
2 parents ace9091 + 87600f7 commit bec3bf8
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 2 deletions.
3 changes: 3 additions & 0 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type ConfigType struct {
MaxQueryLength uint64 `mapstructure:"maxQueryLength"`
CombineMultipleTargetsInOne bool `mapstructure:"combineMultipleTargetsInOne"`

NudgeStartTimeOnAggregation bool `mapstructure:"nudgeStartTimeOnAggregation"`
UseBucketsHighestTimestampOnAggregation bool `mapstructure:"useBucketsHighestTimestampOnAggregation"`

ResponseCache cache.BytesCache `mapstructure:"-" json:"-"`
BackendCache cache.BytesCache `mapstructure:"-" json:"-"`

Expand Down
7 changes: 7 additions & 0 deletions cmd/carbonapi/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
fconfig "github.com/go-graphite/carbonapi/expr/functions/config"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/rewrite"
tconfig "github.com/go-graphite/carbonapi/expr/types/config"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/parser"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"
Expand Down Expand Up @@ -257,6 +258,9 @@ func SetUpConfig(logger *zap.Logger, BuildVersion string) {
)
}

tconfig.Config.NudgeStartTimeOnAggregation = Config.NudgeStartTimeOnAggregation
tconfig.Config.UseBucketsHighestTimestampOnAggregation = Config.UseBucketsHighestTimestampOnAggregation

if Config.Listen != "" {
listeners := make(map[string]struct{})
for _, l := range Config.Listeners {
Expand Down Expand Up @@ -401,6 +405,9 @@ func SetUpViper(logger *zap.Logger, configPath *string, exactConfig bool, viperP
viper.SetDefault("useCachingDNSResolver", false)
viper.SetDefault("logger", map[string]string{})
viper.SetDefault("combineMultipleTargetsInOne", false)
viper.SetDefault("nudgeStartTimeOnAggregation", false)
viper.SetDefault("useBucketsHighestTimestampOnAggregation", false)

viper.AutomaticEnv()

var err error
Expand Down
27 changes: 27 additions & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Table of Contents
* [For IRONdb](#for-irondb)
* [expireDelaySec](#expiredelaysec)
* [Example](#example-21)
* [nudgeStartTimeOnAggregation](#nudgestarttimeonaggregation)
* [useBucketsHighestTimestampOnAggregation](#usebucketshighesttimestamponaggregation)

# General configuration for carbonapi

Expand Down Expand Up @@ -940,3 +942,28 @@ Default: 600 (10 minutes)
```yaml
expireDelaySec: 10
```

***
## nudgeStartTimeOnAggregation
Enables nudging the start time of metrics when aggregating to honor MaxDataPoints.
The start time is nudged in such way that timestamps always fall in the same bucket.
This is done by GraphiteWeb, and is useful to avoid jitter in graphs when refreshing the page.

Default: false

### Example
```yaml
nudgeStartTimeOnAggregation: true
```

***
## useBucketsHighestTimestampOnAggregation
Enables using the highest timestamp of the buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp.
This prevents results to appear to predict the future.

Default: false

### Example
```yaml
useBucketsHighestTimestampOnAggregation: true
```
16 changes: 16 additions & 0 deletions expr/types/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package config

type ConfigType = struct {
// NudgeStartTimeOnAggregation enables nudging the start time of metrics
// when aggregated. The start time is nudged in such way that timestamps
// always fall in the same bucket. This is done by GraphiteWeb, and is
// useful to avoid jitter in graphs when refreshing the page.
NudgeStartTimeOnAggregation bool

// UseBucketsHighestTimestampOnAggregation enables using the highest timestamp of the
// buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp.
// This prevents results to appear to predict the future.
UseBucketsHighestTimestampOnAggregation bool
}

var Config = ConfigType{}
60 changes: 58 additions & 2 deletions expr/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-graphite/carbonapi/expr/consolidations"
"github.com/go-graphite/carbonapi/expr/tags"
"github.com/go-graphite/carbonapi/expr/types/config"
pbv2 "github.com/go-graphite/protocol/carbonapi_v2_pb"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
pickle "github.com/lomik/og-rek"
Expand Down Expand Up @@ -141,7 +142,7 @@ func MarshalJSON(results []*MetricData, timestampMultiplier int64, noNullPoints
b = append(b, `,"datapoints":[`...)

var innerComma bool
t := r.StartTime * timestampMultiplier
t := r.AggregatedStartTime() * timestampMultiplier
for _, v := range r.AggregatedValues() {
if noNullPoints && math.IsNaN(v) {
t += r.AggregatedTimeStep() * timestampMultiplier
Expand Down Expand Up @@ -330,6 +331,60 @@ func (r *MetricData) AggregatedTimeStep() int64 {
return r.StepTime * int64(r.ValuesPerPoint)
}

// AggregatedStartTime returns the start time of the aggregated series.
// This can be different from the original start time if NudgeStartTimeOnAggregation
// or UseBucketsHighestTimestampOnAggregation are enabled.
func (r *MetricData) AggregatedStartTime() int64 {
start := r.StartTime + r.nudgePointsCount()*r.StepTime
if config.Config.UseBucketsHighestTimestampOnAggregation {
return start + r.AggregatedTimeStep() - r.StepTime
}
return start
}

// nudgePointsCount returns the number of points to discard at the beginning of
// the series when aggregating. This is done if NudgeStartTimeOnAggregation is
// enabled, and has the purpose of assigning timestamps of a series to buckets
// consistently across different time ranges. To simplify the aggregation
// logic, we discard points at the beginning of the series so that a bucket
// starts right at the beginning. This function calculates how many points to
// discard.
func (r *MetricData) nudgePointsCount() int64 {
if !config.Config.NudgeStartTimeOnAggregation {
return 0
}

if len(r.Values) <= 2*r.ValuesPerPoint {
// There would be less than 2 points after aggregation, removing one would be too impactful.
return 0
}

// Suppose r.StartTime=4, r.StepTime=3 and aggTimeStep=6.
// - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ...
// - original buckets: -- -- --| | | | | | ...
// - aggregated buckets: -- -- --| | | ...

// We start counting our aggTimeStep buckets at absolute time r.StepTime.
// Notice the following:
// - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ...
// - bucket #: - - - 1 1 1 1 1 1 2 2 2 2 2 2 3 3 ...
// - (ts-step) % aggTimeStep: - - - 0 1 2 3 4 5 0 1 2 3 4 5 0 1 ...

// Given a timestamp 'ts', we can calculate how far it is from the beginning
// of the nearest bucket to the right by doing:
// * aggTimeStep - ((ts-r.StepTime) % aggTimeStep)
// Using this, we calculate the 'distance' from r.StartTime to the
// nearest bucket to the right. If this distance is less than aggTimeStep,
// then r.StartTime is not the beginning of a bucket. We need to discard
// dist / r.StepTime points (which could be zero if dist < r.StepTime).
aggTimeStep := r.AggregatedTimeStep()
dist := aggTimeStep - ((r.StartTime - r.StepTime) % aggTimeStep)
if dist < aggTimeStep {
return dist / r.StepTime
}
return 0
}

// GetAggregateFunction returns MetricData.AggregateFunction and set it, if it's not yet
func (r *MetricData) GetAggregateFunction() func([]float64) float64 {
if r.AggregateFunction == nil {
Expand Down Expand Up @@ -363,7 +418,8 @@ func (r *MetricData) AggregateValues() {
n := len(r.Values)/r.ValuesPerPoint + 1
aggV := make([]float64, 0, n)

v := r.Values
nudgeCount := r.nudgePointsCount()
v := r.Values[nudgeCount:]

for len(v) >= r.ValuesPerPoint {
val := aggFunc(v[:r.ValuesPerPoint])
Expand Down
Loading

0 comments on commit bec3bf8

Please sign in to comment.