Skip to content

Commit

Permalink
refactor: getAggregationBucketMinute and create floorFactor function
Browse files Browse the repository at this point in the history
  • Loading branch information
koladilip committed Dec 13, 2024
1 parent 8e3310d commit ac0e7b8
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 12 deletions.
28 changes: 28 additions & 0 deletions enterprise/reporting/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,34 @@ func TestTransformMetricWithEventSamplingWithNilEventSampler(t *testing.T) {
require.Equal(t, sampleResponse, transformedMetric.StatusDetail.SampleResponse)
}

func TestFloorFactor(t *testing.T) {
tests := []struct {
name string
intervalMs int64
expected int64
}{
// Edge cases
{name: "Smaller than smallest factor", intervalMs: 0, expected: 1},
{name: "Exact match for smallest factor", intervalMs: 1, expected: 1},
{name: "Exact match for largest factor", intervalMs: 60, expected: 60},
{name: "Larger than largest factor", intervalMs: 100, expected: 60},

// Typical cases
{name: "Between 10 and 12", intervalMs: 11, expected: 10},
{name: "Between 4 and 6", intervalMs: 5, expected: 5},
{name: "Between 20 and 30", intervalMs: 25, expected: 20},
{name: "Exact match in the middle", intervalMs: 30, expected: 30},
{name: "Exact match at a non-boundary point", intervalMs: 12, expected: 12},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := floorFactor(test.intervalMs)
require.Equal(t, test.expected, result)
})
}
}

func TestTransformMetricWithEventSampling(t *testing.T) {
sampleEvent := []byte(`{"event": "2"}`)
sampleResponse := "sample response"
Expand Down
36 changes: 24 additions & 12 deletions enterprise/reporting/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,46 @@ package reporting

import (
"encoding/json"
"sort"
"strings"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
"github.com/rudderlabs/rudder-server/utils/types"
)

func floorFactor(intervalMs int64) int64 {
factors := []int64{1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60}

// Find the smallest index where factors[i] >= intervalMs
index := sort.Search(len(factors), func(i int) bool {
return factors[i] >= intervalMs
})

// If index is 0, intervalMs is smaller than the smallest factor
if index == 0 {
return factors[0]
}

// If factors[index] == intervalMs, return it directly
if index < len(factors) && factors[index] == intervalMs {
return factors[index]
}

// Otherwise, return the previous factor
return factors[index-1]
}

func getAggregationBucketMinute(timeMs, intervalMs int64) (int64, int64) {
// If interval is not a factor of 60, then the bucket start will not be aligned to hour start
// For example, if intervalMs is 7, and timeMs is 28891085 (6:05) then the bucket start will be 28891079 (5:59)
// and current bucket will contain the data of 2 different hourly buckets, which is should not have happened.
// To avoid this, we round the intervalMs to the nearest factor of 60.
if intervalMs <= 0 || 60%intervalMs != 0 {
factors := []int64{1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60}
closestFactor := factors[0]
for _, factor := range factors {
if factor < intervalMs {
closestFactor = factor
} else {
break
}
}
intervalMs = closestFactor
intervalMs = floorFactor(intervalMs)
}

bucketStart := timeMs - (timeMs % intervalMs)
bucketEnd := bucketStart + intervalMs

return bucketStart, bucketEnd
}

Expand Down

0 comments on commit ac0e7b8

Please sign in to comment.