Skip to content

Commit

Permalink
fix: skip points outside of archive retention on first pass (#82)
Browse files Browse the repository at this point in the history
Previous we kept points in a lower archive if that timestamp did not exist in a higher-resolution archive. This was incorrect behavior because the lower archive might be a SUM aggregation, and that results in spike points amidst the original high-res points. Instead, we only take points if there is no higher-resolution archive covering that time span. Similarly, we do not keep any points in a high res archive if those points are covered by a lower-res archive. In effect, we always know which points are valid based on the boundaries between the archives:

```
   MaxT     MinT
0: [XXXXXXXXXX] 1d, 1m
1: [           XXXXXXXXXXXXXXXXX] 1w, 10m
2: [                            XXXXXXXXXXXXX]  1y, 60m
```

Based on this new approach, we can greatly reduce the amount of logic in the ReadSamples function and make it much faster and more space-efficient. We also discovered new edge cases and added tests for those. 

Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
Co-authored-by: Owen Williams <owen.williams@grafana.com>
  • Loading branch information
jesusvazquez and ywwg authored Sep 28, 2023
1 parent 96e0cdc commit b4b782c
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 76 deletions.
13 changes: 12 additions & 1 deletion pkg/graphite/convert/whisperconverter/daterange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package whisperconverter
import (
"fmt"
"math"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -89,6 +90,16 @@ READLOOP:
}
}

fmt.Printf("--start-date %s --end-date %s\n", time.UnixMilli(minTS).Format("2006-01-02"), time.UnixMilli(maxTS).Format("2006-01-02"))
terms := []string{}

if minTS != int64(math.MaxInt64) {
terms = append(terms, fmt.Sprintf("--start-date %s", time.UnixMilli(minTS).Format("2006-01-02")))
}
if maxTS != int64(math.MinInt64) {
terms = append(terms, fmt.Sprintf("--end-date %s", time.UnixMilli(maxTS).Format("2006-01-02")))
}
terms = append(terms, "\n")
fmt.Printf(strings.Join(terms, " "))

wg.Done()
}
16 changes: 13 additions & 3 deletions pkg/graphite/convert/whisperconverter/daterange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,27 @@ import (
"os"
"regexp"
"testing"
"time"

"github.com/go-graphite/go-whisper"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

// This test contains a data due to how we take over STDOUT but it should be harmless.
func TestCommandDateRange(t *testing.T) {
tmpDir, err := os.MkdirTemp("/tmp", "testCommandDateRange*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
// tmpDir, err := os.MkdirTemp("/tmp", "testCommandDateRange*")
// require.NoError(t, err)
// defer os.RemoveAll(tmpDir)
whisper.Now = func() time.Time {
t, err := time.Parse("2006-01-02", "2022-06-01")
if err != nil {
panic(err)
}
return t
}
tmpDir := "/tmp"

fooTimes, err := ToTimes([]string{
"2022-05-01",
Expand Down
12 changes: 12 additions & 0 deletions pkg/graphite/convert/whisperconverter/test_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package whisperconverter

import (
"fmt"
"os"
"time"

Expand All @@ -22,14 +23,25 @@ func CreateWhisperFile(path string, timestamps []*time.Time) error {
if err != nil {
return err
}
// defer wsp.Close()

for _, t := range timestamps {
fmt.Println("yes?")
err = wsp.Update(1.0, int(t.Unix()))
if err != nil {
fmt.Println(err)
return err
}
}

wsp.Close()

// wsp, err = whisper.Open(path)
// if err != nil {
// return err
// }
// wsp.Dump(true, false)

return nil
}

Expand Down
129 changes: 86 additions & 43 deletions pkg/graphite/convert/whisperconverter/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,67 +40,110 @@ type Archive interface {
DumpArchive(int) ([]whisper.Point, error)
}

// pointWithPrecision is a whisper Point with the precision of the archive it
// came from. This is used to differentiate when we have duplicate timestamps at
// different precisions.
type pointWithPrecision struct {
whisper.Point
secondsPerPoint uint32
}

// ReadPoints reads and concatenates all of the points in a whisper Archive.
func ReadPoints(w Archive, name string) ([]whisper.Point, error) {
if len(w.GetArchives()) == 0 {
archives := w.GetArchives()
if len(archives) == 0 {
return nil, fmt.Errorf("whisper file contains no archives for metric: %q", name)
}

totalPoints := 0
for _, a := range w.GetArchives() {
totalPoints += int(a.Points)
}

// Preallocate space for all allPoints in one slice.
allPoints := make([]pointWithPrecision, totalPoints)
pIdx := 0
// Dump one precision level at a time and write into the output slice.
for i, a := range w.GetArchives() {
archivePoints, err := w.DumpArchive(i)
if err != nil {
return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name)
// Its important to remember that the archive with index 0 (first archive)
// has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision
keptPoints := []whisper.Point{}

// We want to track the max timestamp of the archives because we know it
// virtually represents now() and we won't have newer points.
var maxTs, maxTsOffset uint32
for i := range archives {
// All archives share the same maxTs, so only calculate it once.
if maxTs == 0 {
if i > 0 {
// If there are no points in the high-res archives, we have to bump up
// maxTs by the difference in retention to the next higher archive so
// that this point is validly in the retention for this archive. This
// can happen when the only points added to a whisper archive are
// significantly older than "Now()" at the time of writing, as happens
// during our e2e test.
maxTsOffset = archives[i-1].Retention()
}
points, err := w.DumpArchive(i)
if err != nil {
return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name)
}
for _, p := range points {
if p.Timestamp > maxTs {
maxTs = p.Timestamp
}
}
}
for j, p := range archivePoints {
allPoints[pIdx+j] = pointWithPrecision{p, a.SecondsPerPoint}
}
maxTs += maxTsOffset

// Also determine the boundaries between archives.
lowerBoundTs := make([]uint32, len(archives))
for i, a := range archives {
if maxTs < a.Retention() {
// very big retention, boundary would be < 0, therefore all points are
// covered by this archive.
lowerBoundTs[i] = 0
} else {
lowerBoundTs[i] = maxTs - a.Retention()
}
pIdx += len(archivePoints)
}

// Points must be in time order.
sort.Slice(allPoints, func(i, j int) bool {
return allPoints[i].Timestamp < allPoints[j].Timestamp
})
// no maxTs means no points. This is not an error.
if maxTs == 0 {
return []whisper.Point{}, nil
}

trimmedPoints := []whisper.Point{}
for i := 0; i < len(allPoints); i++ {
// Remove all points of time = 0.
if allPoints[i].Timestamp == 0 {
// Iterate over archives backwards so we process oldest points first. Sort the
// points, then determine the slice that is between the bounds for this
// archive, and append those to the output array.
for i := len(archives) - 1; i >= 0; i-- {
points, err := w.DumpArchive(i)
if err != nil {
return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name)
}

if len(points) == 0 {
continue
}
// There might be duplicate timestamps in different archives. Take the
// higher-precision archive value since it's unaggregated.
if i > 0 && allPoints[i].Timestamp == allPoints[i-1].Timestamp {
if allPoints[i].secondsPerPoint == allPoints[i-1].secondsPerPoint {
return nil, fmt.Errorf("duplicate timestamp at same precision in archive %s: %d", name, allPoints[i].Timestamp)

// Sort this archive.
sort.Slice(points, func(i, j int) bool {
return points[i].Timestamp < points[j].Timestamp
})

startIdx := -1
endIdx := len(points) - 1
for j, p := range points {
if p.Timestamp == 0 {
continue
}
if allPoints[i].secondsPerPoint < allPoints[i-1].secondsPerPoint {
trimmedPoints[len(trimmedPoints)-1] = allPoints[i].Point
// Don't include any points in this archive that are older than the
// retention period.
if p.Timestamp <= lowerBoundTs[i] {
continue
}
// If the previous point is higher precision, just continue.
continue
// Don't include any points in this archive that are covered in a higher
// resolution archive. If the other boundary is zero, it is invalid
// so we keep the point.
if i > 0 && p.Timestamp > lowerBoundTs[i-1] {
break
}
endIdx = j
if startIdx == -1 {
startIdx = j
}
}
// if startIdx is -1, we did not find any valid points.
if startIdx != -1 {
keptPoints = append(keptPoints, points[startIdx:endIdx+1]...)
}
trimmedPoints = append(trimmedPoints, allPoints[i].Point)
}

return trimmedPoints, nil
return keptPoints, nil
}

// ToMimirSamples converts a Whisper metric with the given name to a slice of
Expand Down
Loading

0 comments on commit b4b782c

Please sign in to comment.