Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade SDK v1.5.0 and add query concurrency #19

Merged
merged 3 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: "go.mod"
- name: Format
run: diff -u <(echo -n) <(gofmt -d -s .)
- name: Vet
Expand Down
90 changes: 87 additions & 3 deletions configuration/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

var bannedLabels = []string{"__name__"}
var nativeQueryVariableRegex = regexp.MustCompile(`\$\{?([a-zA-Z_]\w+)\}?`)

type ExcludeLabels struct {
Regex *regexp.Regexp
Expand Down Expand Up @@ -172,12 +173,20 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
return nil
}

newNativeQueries := make(map[string]metadata.NativeQuery)
for key, nativeQuery := range uc.Config.Metadata.NativeOperations.Queries {
if _, ok := uc.Config.Metadata.Metrics[key]; ok {
return fmt.Errorf("duplicated native query name `%s`. That name may exist in the metrics collection", key)
}
slog.Debug(key, slog.String("type", "native_query"), slog.String("query", nativeQuery.Query))
args, err := findNativeQueryVariables(nativeQuery)
if err != nil {
return fmt.Errorf("%s; query: %s", err, nativeQuery.Query)
}
nativeQuery.Arguments = args
query := nativeQuery.Query

// validate arguments and promQL syntaxes
for k, v := range nativeQuery.Arguments {
switch v.Type {
case string(metadata.ScalarInt64), string(metadata.ScalarFloat64):
Expand All @@ -188,12 +197,23 @@ func (uc *updateCommand) validateNativeQueries(ctx context.Context) error {
return fmt.Errorf("invalid argument type `%s` in the native query `%s`", k, key)
}
}
_, err := uc.Client.FormatQuery(ctx, query)
_, err = uc.Client.FormatQuery(ctx, query)
if err != nil {
return fmt.Errorf("invalid native query %s: %s", key, err)
}

// format and replace $<name> to ${<name>}
query, err = formatNativeQueryVariables(nativeQuery.Query, nativeQuery.Arguments)
if err != nil {
return err
}

nativeQuery.Query = query
newNativeQueries[key] = nativeQuery
}

uc.Config.Metadata.NativeOperations.Queries = newNativeQueries

return nil
}

Expand Down Expand Up @@ -231,8 +251,9 @@ var defaultConfiguration = metadata.Configuration{
NativeOperations: metadata.NativeOperations{},
},
Runtime: metadata.RuntimeSettings{
Flat: false,
UnixTimeUnit: client.UnixTimeSecond,
Flat: false,
UnixTimeUnit: client.UnixTimeSecond,
ConcurrencyLimit: 5,
Format: metadata.RuntimeFormatSettings{
Timestamp: metadata.TimestampUnix,
Value: metadata.ValueFloat64,
Expand All @@ -256,3 +277,66 @@ func validateRegularExpressions(patterns []*regexp.Regexp, input string) bool {
}
return false
}

func findNativeQueryVariables(nq metadata.NativeQuery) (map[string]metadata.NativeQueryArgumentInfo, error) {
result := map[string]metadata.NativeQueryArgumentInfo{}
matches := nativeQueryVariableRegex.FindAllStringSubmatchIndex(nq.Query, -1)
if len(matches) == 0 {
return result, nil
}

queryLength := len(nq.Query)
for _, match := range matches {
if len(match) < 4 {
continue
}
name := nq.Query[match[2]:match[3]]
argumentInfo := metadata.NativeQueryArgumentInfo{}

if match[0] > 0 && nq.Query[match[0]-1] == '[' {
// duration variables should be bounded by square brackets
if match[1] >= queryLength-1 || nq.Query[match[1]] != ']' {
return nil, fmt.Errorf("invalid promQL range syntax")
}
argumentInfo.Type = string(metadata.ScalarDuration)
} else if match[0] > 0 && nq.Query[match[0]-1] == '"' {
// duration variables should be bounded by double quotes
if match[1] >= queryLength-1 || nq.Query[match[1]] != '"' {
return nil, fmt.Errorf("invalid promQL string syntax")
}
argumentInfo.Type = string(metadata.ScalarString)
}

if len(nq.Arguments) > 0 {
// merge the existing argument from the configuration file
arg, ok := nq.Arguments[name]
if ok {
argumentInfo.Description = arg.Description
if argumentInfo.Type == "" && arg.Type != "" {
argumentInfo.Type = arg.Type
}
}
}
if argumentInfo.Type == "" {
argumentInfo.Type = string(metadata.ScalarString)
}

result[name] = argumentInfo
}

return result, nil
}

func formatNativeQueryVariables(queryInput string, variables map[string]metadata.NativeQueryArgumentInfo) (string, error) {
query := queryInput
for key := range variables {
rawPattern := fmt.Sprintf(`\$\{?%s\}?`, key)
rg, err := regexp.Compile(rawPattern)
if err != nil {
return "", fmt.Errorf("failed to compile regular expression %s, query: %s, error: %s", rawPattern, queryInput, err)
}
query = rg.ReplaceAllLiteralString(query, fmt.Sprintf("${%s}", key))
}

return query, nil
}
121 changes: 121 additions & 0 deletions configuration/update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"testing"

"github.com/hasura/ndc-prometheus/connector/metadata"
"gotest.tools/v3/assert"
)

func TestNativeQueryVariables(t *testing.T) {
testCases := []struct {
Input metadata.NativeQuery
ExpectedArguments map[string]metadata.NativeQueryArgumentInfo
ExpectedQuery string
ErrorMsg string
}{
{
Input: metadata.NativeQuery{
Query: "up",
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{},
ExpectedQuery: "up",
},
{
Input: metadata.NativeQuery{
Query: `up{job="${job}", instance="$instance"}`,
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"instance": {
Type: string(metadata.ScalarString),
},
},
ExpectedQuery: `up{job="${job}", instance="${instance}"}`,
},
{
Input: metadata.NativeQuery{
Query: `rate(up{job="${job}", instance="$instance"}[$range])`,
Arguments: map[string]metadata.NativeQueryArgumentInfo{},
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"instance": {
Type: string(metadata.ScalarString),
},
"range": {
Type: "Duration",
},
},
ExpectedQuery: `rate(up{job="${job}", instance="${instance}"}[${range}])`,
},
{
Input: metadata.NativeQuery{
Query: `up{job="${job}"} > $value`,
Arguments: map[string]metadata.NativeQueryArgumentInfo{
"value": {
Type: string(metadata.ScalarFloat64),
},
},
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"value": {
Type: string(metadata.ScalarFloat64),
},
},
ExpectedQuery: `up{job="${job}"} > ${value}`,
},
{
Input: metadata.NativeQuery{
Query: `up{job="${job}"} > $value`,
Arguments: map[string]metadata.NativeQueryArgumentInfo{
"value": {},
},
},
ExpectedArguments: map[string]metadata.NativeQueryArgumentInfo{
"job": {
Type: string(metadata.ScalarString),
},
"value": {
Type: string(metadata.ScalarString),
},
},
ExpectedQuery: `up{job="${job}"} > ${value}`,
},
{
Input: metadata.NativeQuery{
Query: "up[$range",
},
ErrorMsg: "invalid promQL range syntax",
},
{
Input: metadata.NativeQuery{
Query: `up{job="$job}`,
},
ErrorMsg: "invalid promQL string syntax",
},
}

for _, tc := range testCases {
t.Run(tc.Input.Query, func(t *testing.T) {
arguments, err := findNativeQueryVariables(tc.Input)
if tc.ErrorMsg != "" {
assert.ErrorContains(t, err, tc.ErrorMsg)
return
}

assert.NilError(t, err)
assert.DeepEqual(t, arguments, tc.ExpectedArguments)
query, err := formatNativeQueryVariables(tc.Input.Query, tc.ExpectedArguments)
assert.NilError(t, err)
assert.Equal(t, query, tc.ExpectedQuery)
})
}
}
78 changes: 64 additions & 14 deletions connector/client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"time"

"github.com/hasura/ndc-sdk-go/utils"
Expand Down Expand Up @@ -40,18 +41,8 @@ func evalStepFromRange(start time.Time, end time.Time) time.Duration {
return time.Second
case difference <= time.Hour:
return time.Minute
case difference <= 3*time.Hour:
return 5 * time.Minute
case difference <= 6*time.Hour:
return 10 * time.Minute
case difference <= 12*time.Hour:
return 30 * time.Minute
case difference <= 24*time.Hour:
return time.Hour
case difference <= maxSteps*24*time.Hour:
return 24 * time.Hour
default:
return 30 * 24 * time.Hour
return difference / 60
}
}

Expand All @@ -62,7 +53,11 @@ func ParseDuration(value any, unixTimeUnit UnixTimeUnit) (time.Duration, error)
return 0, nil
}

kind := reflectValue.Kind()
return parseDurationReflection(reflectValue, reflectValue.Kind(), unixTimeUnit)
}

// parseDurationReflection parses duration from a reflection value
func parseDurationReflection(reflectValue reflect.Value, kind reflect.Kind, unixTimeUnit UnixTimeUnit) (time.Duration, error) {
switch kind {
case reflect.Invalid:
return 0, nil
Expand All @@ -85,6 +80,59 @@ func ParseDuration(value any, unixTimeUnit UnixTimeUnit) (time.Duration, error)
}
}

// RangeResolution represents the given range and resolution with format xx:xx
type RangeResolution struct {
Range model.Duration
Resolution model.Duration
}

// String implements the fmt.Stringer interface
func (rr RangeResolution) String() string {
if rr.Resolution == 0 {
return rr.Range.String()
}
return fmt.Sprintf("%s:%s", rr.Range.String(), rr.Resolution.String())
}

// ParseRangeResolution parses the range resolution from a string
func ParseRangeResolution(input any, unixTimeUnit UnixTimeUnit) (*RangeResolution, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(input))
if !ok {
return nil, nil
}

kind := reflectValue.Kind()
if kind != reflect.String {
rng, err := parseDurationReflection(reflectValue, kind, unixTimeUnit)
if err != nil {
return nil, fmt.Errorf("invalid range resolution %v: %s", input, err)
}
return &RangeResolution{Range: model.Duration(rng)}, nil
}

parts := strings.Split(reflectValue.String(), ":")
if parts[0] == "" {
return nil, fmt.Errorf("invalid range resolution %v", input)
}

rng, err := model.ParseDuration(parts[0])
if err != nil {
return nil, fmt.Errorf("invalid duration %s: %s", parts[0], err)
}

result := &RangeResolution{
Range: rng,
}
if len(parts) > 1 {
resolution, err := model.ParseDuration(parts[1])
if err != nil {
return nil, fmt.Errorf("invalid resolution %s: %s", parts[1], err)
}
result.Resolution = resolution
}
return result, nil
}

// ParseTimestamp parses timestamp from an unknown value
func ParseTimestamp(s any, unixTimeUnit UnixTimeUnit) (*time.Time, error) {
reflectValue, ok := utils.UnwrapPointerFromReflectValue(reflect.ValueOf(s))
Expand All @@ -104,8 +152,10 @@ func ParseTimestamp(s any, unixTimeUnit UnixTimeUnit) (*time.Time, error) {
return &now, nil
}
// Input timestamps may be provided either in RFC3339 format
if t, err := time.Parse(time.RFC3339, strValue); err == nil {
return &t, nil
for _, format := range []string{time.RFC3339, "2006-01-02T15:04:05Z0700", "2006-01-02T15:04:05-0700", time.RFC3339Nano, time.DateOnly} {
if t, err := time.Parse(format, strValue); err == nil {
return &t, nil
}
}
if d, err := time.ParseDuration(strValue); err == nil {
result := time.Now().Add(-d)
Expand Down
Loading