Skip to content

Commit

Permalink
upgrade SDK v1.5.0 and add query concurrency (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac authored Oct 6, 2024
1 parent 3d45dd1 commit af12838
Show file tree
Hide file tree
Showing 17 changed files with 433 additions and 144 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: "go.mod"
- name: Format
run: diff -u <(echo -n) <(gofmt -d -s .)
- name: Vet
Expand Down
90 changes: 87 additions & 3 deletions configuration/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

var bannedLabels = []string{"__name__"}
var nativeQueryVariableRegex = regexp.MustCompile(`\$\{?([a-zA-Z_]\w+)\}?`)

type ExcludeLabels struct {
Regex *regexp.Regexp
Expand Down Expand Up @@ -172,12 +173,20 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
return nil
}

newNativeQueries := make(map[string]metadata.NativeQuery)
for key, nativeQuery := range uc.Config.Metadata.NativeOperations.Queries {
if _, ok := uc.Config.Metadata.Metrics[key]; ok {
return fmt.Errorf("duplicated native query name `%s`. That name may exist in the metrics collection", key)
}
slog.Debug(key, slog.String("type", "native_query"), slog.String("query", nativeQuery.Query))
args, err := findNativeQueryVariables(nativeQuery)
if err != nil {
return fmt.Errorf("%s; query: %s", err, nativeQuery.Query)
}
nativeQuery.Arguments = args
query := nativeQuery.Query

// validate arguments and promQL syntaxes
for k, v := range nativeQuery.Arguments {
switch v.Type {
case string(metadata.ScalarInt64), string(metadata.ScalarFloat64):
Expand All @@ -188,12 +197,23 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
return fmt.Errorf("invalid argument type `%s` in the native query `%s`", k, key)
}
}
_, err := uc.Client.FormatQuery(ctx, query)
_, err = uc.Client.FormatQuery(ctx, query)
if err != nil {
return fmt.Errorf("invalid native query %s: %s", key, err)
}

// format and replace $<name> to ${<name>}
query, err = formatNativeQueryVariables(nativeQuery.Query, nativeQuery.Arguments)
if err != nil {
return err
}

nativeQuery.Query = query
newNativeQueries[key] = nativeQuery
}

uc.Config.Metadata.NativeOperations.Queries = newNativeQueries

return nil
}

Expand Down Expand Up @@ -231,8 +251,9 @@ var defaultConfiguration = metadata.Configuration{
NativeOperations: metadata.NativeOperations{},
},
Runtime: metadata.RuntimeSettings{
Flat: false,
UnixTimeUnit: client.UnixTimeSecond,
Flat: false,
UnixTimeUnit: client.UnixTimeSecond,
ConcurrencyLimit: 5,
Format: metadata.RuntimeFormatSettings{
Timestamp: metadata.TimestampUnix,
Value: metadata.ValueFloat64,
Expand All @@ -256,3 +277,66 @@ func validateRegularExpressions(patterns []*regexp.Regexp, input string) bool {
}
return false
}

func findNativeQueryVariables(nq metadata.NativeQuery) (map[string]metadata.NativeQueryArgumentInfo, error) {
result := map[string]metadata.NativeQueryArgumentInfo{}
matches := nativeQueryVariableRegex.FindAllStringSubmatchIndex(nq.Query, -1)
if len(matches) == 0 {
return result, nil
}

queryLength := len(nq.Query)
for _, match := range matches {
if len(match) < 4 {
continue
}
name := nq.Query[match[2]:match[3]]
argumentInfo := metadata.NativeQueryArgumentInfo{}

if match[0] > 0 && nq.Query[match[0]-1] == '[' {
// duration variables should be bounded by square brackets
if match[1] >= queryLength-1 || nq.Query[match[1]] != ']' {
return nil, fmt.Errorf("invalid promQL range syntax")
}
argumentInfo.Type = string(metadata.ScalarDuration)
} else if match[0] > 0 && nq.Query[match[0]-1] == '"' {
// duration variables should be bounded by double quotes
if match[1] >= queryLength-1 || nq.Query[match[1]] != '"' {
return nil, fmt.Errorf("invalid promQL string syntax")
}
argumentInfo.Type = string(metadata.ScalarString)
}

if len(nq.Arguments) > 0 {
// merge the existing argument from the configuration file
arg, ok := nq.Arguments[name]
if ok {
argumentInfo.Description = arg.Description
if argumentInfo.Type == "" && arg.Type != "" {
argumentInfo.Type = arg.Type
}
}
}
if argumentInfo.Type == "" {
argumentInfo.Type = string(metadata.ScalarString)
}

result[name] = argumentInfo
}

return result, nil
}

func formatNativeQueryVariables(queryInput string, variables map[string]metadata.NativeQueryArgumentInfo) (string, error) {
query := queryInput
for key := range variables {
rawPattern := fmt.Sprintf(`\$\{?%s\}?`, key)
rg, err := regexp.Compile(rawPattern)
if err != nil {
return "", fmt.Errorf("failed to compile regular expression %s, query: %s, error: %s", rawPattern, queryInput, err)
}
query = rg.ReplaceAllLiteralString(query, fmt.Sprintf("${%s}", key))
}

return query, nil
}
121 changes: 121 additions & 0 deletions configuration/update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"testing"

"github.com/hasura/ndc-prometheus/connector/metadata"
"gotest.tools/v3/assert"
)

func TestNativeQueryVariables(t *testing.T) {
testCases := []struct {
Input metadata.NativeQuery
ExpectedArguments map[string]metadata.NativeQueryArgumentInfo
ExpectedQuery string
ErrorMsg string
}{
{
Input: metadata.NativeQuery{
Query: "up",
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{},
ExpectedQuery: "up",
},
{
Input: metadata.NativeQuery{
Query: `up{job="${job}", instance="$instance"}`,
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"instance": {
Type: string(metadata.ScalarString),
},
},
ExpectedQuery: `up{job="${job}", instance="${instance}"}`,
},
{
Input: metadata.NativeQuery{
Query: `rate(up{job="${job}", instance="$instance"}[$range])`,
Arguments: map[string]metadata.NativeQueryArgumentInfo{},
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"instance": {
Type: string(metadata.ScalarString),
},
"range": {
Type: "Duration",
},
},
ExpectedQuery: `rate(up{job="${job}", instance="${instance}"}[${range}])`,
},
{
Input: metadata.NativeQuery{
Query: `up{job="${job}"} > $value`,
Arguments: map[string]metadata.NativeQueryArgumentInfo{
"value": {
Type: string(metadata.ScalarFloat64),
},
},
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"value": {
Type: string(metadata.ScalarFloat64),
},
},
ExpectedQuery: `up{job="${job}"} > ${value}`,
},
{
Input: metadata.NativeQuery{
Query: `up{job="${job}"} > $value`,
Arguments: map[string]metadata.NativeQueryArgumentInfo{
"value": {},
},
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"value": {
Type: string(metadata.ScalarString),
},
},
ExpectedQuery: `up{job="${job}"} > ${value}`,
},
{
Input: metadata.NativeQuery{
Query: "up[$range",
},
ErrorMsg: "invalid promQL range syntax",
},
{
Input: metadata.NativeQuery{
Query: `up{job="$job}`,
},
ErrorMsg: "invalid promQL string syntax",
},
}

for _, tc := range testCases {
t.Run(tc.Input.Query, func(t *testing.T) {
arguments, err := findNativeQueryVariables(tc.Input)
if tc.ErrorMsg != "" {
assert.ErrorContains(t, err, tc.ErrorMsg)
return
}

assert.NilError(t, err)
assert.DeepEqual(t, arguments, tc.ExpectedArguments)
query, err := formatNativeQueryVariables(tc.Input.Query, tc.ExpectedArguments)
assert.NilError(t, err)
assert.Equal(t, query, tc.ExpectedQuery)
})
}
}
78 changes: 64 additions & 14 deletions connector/client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"time"

"github.com/hasura/ndc-sdk-go/utils"
Expand Down Expand Up @@ -40,18 +41,8 @@ func evalStepFromRange(start time.Time, end time.Time) time.Duration {
return time.Second
case difference <= time.Hour:
return time.Minute
case difference <= 3*time.Hour:
return 5 * time.Minute
case difference <= 6*time.Hour:
return 10 * time.Minute
case difference <= 12*time.Hour:
return 30 * time.Minute
case difference <= 24*time.Hour:
return time.Hour
case difference <= maxSteps*24*time.Hour:
return 24 * time.Hour
default:
return 30 * 24 * time.Hour
return difference / 60
}
}

Expand All @@ -62,7 +53,11 @@ func ParseDuration(value any, unixTimeUnit UnixTimeUnit) (time.Duration, error)
return 0, nil
}

kind := reflectValue.Kind()
return parseDurationReflection(reflectValue, reflectValue.Kind(), unixTimeUnit)
}

// parseDurationReflection parses duration from a reflection value
func parseDurationReflection(reflectValue reflect.Value, kind reflect.Kind, unixTimeUnit UnixTimeUnit) (time.Duration, error) {
switch kind {
case reflect.Invalid:
return 0, nil
Expand All @@ -85,6 +80,59 @@ func ParseDuration(value any, unixTimeUnit UnixTimeUnit) (time.Duration, error)
}
}

// RangeResolution represents the given range and resolution with format xx:xx
type RangeResolution struct {
Range model.Duration
Resolution model.Duration
}

// String implements the fmt.Stringer interface
func (rr RangeResolution) String() string {
if rr.Resolution == 0 {
return rr.Range.String()
}
return fmt.Sprintf("%s:%s", rr.Range.String(), rr.Resolution.String())
}

// ParseRangeResolution parses the range resolution from a string
func ParseRangeResolution(input any, unixTimeUnit UnixTimeUnit) (*RangeResolution, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(input))
if !ok {
return nil, nil
}

kind := reflectValue.Kind()
if kind != reflect.String {
rng, err := parseDurationReflection(reflectValue, kind, unixTimeUnit)
if err != nil {
return nil, fmt.Errorf("invalid range resolution %v: %s", input, err)
}
return &RangeResolution{Range: model.Duration(rng)}, nil
}

parts := strings.Split(reflectValue.String(), ":")
if parts[0] == "" {
return nil, fmt.Errorf("invalid range resolution %v", input)
}

rng, err := model.ParseDuration(parts[0])
if err != nil {
return nil, fmt.Errorf("invalid duration %s: %s", parts[0], err)
}

result := &RangeResolution{
Range: rng,
}
if len(parts) > 1 {
resolution, err := model.ParseDuration(parts[1])
if err != nil {
return nil, fmt.Errorf("invalid resolution %s: %s", parts[1], err)
}
result.Resolution = resolution
}
return result, nil
}

// ParseTimestamp parses timestamp from an unknown value
func ParseTimestamp(s any, unixTimeUnit UnixTimeUnit) (*time.Time, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(s))
Expand All @@ -104,8 +152,10 @@ func ParseTimestamp(s any, unixTimeUnit UnixTimeUnit) (*time.Time, error) {
return &now, nil
}
// Input timestamps may be provided either in RFC3339 format
if t, err := time.Parse(time.RFC3339, strValue); err == nil {
return &t, nil
for _, format := range []string{time.RFC3339, "2006-01-02T15:04:05Z0700", "2006-01-02T15:04:05-0700", time.RFC3339Nano, time.DateOnly} {
if t, err := time.Parse(format, strValue); err == nil {
return &t, nil
}
}
if d, err := time.ParseDuration(strValue); err == nil {
result := time.Now().Add(-d)
Expand Down
Loading

0 comments on commit af12838

Please sign in to comment.