Skip to content

Commit

Permalink
Lookout v2 API - state counts as group aggregate (#2605)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlocamurri committed Jun 23, 2023
1 parent d59732a commit 5bf48cb
Show file tree
Hide file tree
Showing 14 changed files with 803 additions and 214 deletions.
12 changes: 12 additions & 0 deletions internal/common/database/lookout/jobstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ const (
)

var (
// JobStates is an ordered list of states
JobStates = []JobState{
JobQueued,
JobLeased,
JobPending,
JobRunning,
JobSucceeded,
JobFailed,
JobCancelled,
JobPreempted,
}

JobStateMap = map[int]JobState{
JobLeasedOrdinal: JobLeased,
JobQueuedOrdinal: JobQueued,
Expand Down
10 changes: 8 additions & 2 deletions internal/lookoutv2/conversions/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,22 @@ var (
}

swaggerGroup = &models.Group{
Aggregates: map[string]string{
Aggregates: map[string]interface{}{
"averageTimeInState": "3d",
"state": map[string]int{
"QUEUED": 321,
},
},
Count: 1000,
Name: "queue-1",
}

group = &model.JobGroup{
Aggregates: map[string]string{
Aggregates: map[string]interface{}{
"averageTimeInState": "3d",
"state": map[string]int{
"QUEUED": 321,
},
},
Count: 1000,
Name: "queue-1",
Expand Down
10 changes: 9 additions & 1 deletion internal/lookoutv2/gen/models/group.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/lookoutv2/gen/restapi/embedded_spec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/lookoutv2/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Run struct {
}

type JobGroup struct {
Aggregates map[string]string
Aggregates map[string]interface{}
Count int64
Name string
}
Expand Down
133 changes: 133 additions & 0 deletions internal/lookoutv2/repository/aggregates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package repository

import (
"fmt"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/database/lookout"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/lookoutv2/model"
)

type QueryAggregator interface {
AggregateSql() (string, error)
}

type SqlFunctionAggregator struct {
queryCol *queryColumn
sqlFunction string
}

func NewSqlFunctionAggregator(queryCol *queryColumn, fn string) *SqlFunctionAggregator {
return &SqlFunctionAggregator{
queryCol: queryCol,
sqlFunction: fn,
}
}

func (qa *SqlFunctionAggregator) aggregateColName() string {
return qa.queryCol.name
}

func (qa *SqlFunctionAggregator) AggregateSql() (string, error) {
return fmt.Sprintf("%s(%s.%s) AS %s", qa.sqlFunction, qa.queryCol.abbrev, qa.queryCol.name, qa.aggregateColName()), nil
}

type StateCountAggregator struct {
queryCol *queryColumn
stateString string
}

func NewStateCountAggregator(queryCol *queryColumn, stateString string) *StateCountAggregator {
return &StateCountAggregator{
queryCol: queryCol,
stateString: stateString,
}
}

func (qa *StateCountAggregator) aggregateColName() string {
return fmt.Sprintf("%s_%s", qa.queryCol.name, qa.stateString)
}

func (qa *StateCountAggregator) AggregateSql() (string, error) {
stateInt, ok := lookout.JobStateOrdinalMap[lookout.JobState(qa.stateString)]
if !ok {
return "", errors.Errorf("state %s does not exist", qa.stateString)
}
return fmt.Sprintf(
"SUM(CASE WHEN %s.%s = %d THEN 1 ELSE 0 END) AS %s",
qa.queryCol.abbrev, qa.queryCol.name, stateInt, qa.aggregateColName(),
), nil
}

func GetAggregatorsForColumn(queryCol *queryColumn, aggregateType AggregateType, filters []*model.Filter) ([]QueryAggregator, error) {
switch aggregateType {
case Max:
return []QueryAggregator{NewSqlFunctionAggregator(queryCol, "MAX")}, nil
case Average:
return []QueryAggregator{NewSqlFunctionAggregator(queryCol, "AVG")}, nil
case StateCounts:
states := GetStatesForFilter(filters)
aggregators := make([]QueryAggregator, len(states))
for i, state := range states {
aggregators[i] = NewStateCountAggregator(queryCol, state)
}
return aggregators, nil
default:
return nil, errors.Errorf("cannot determine aggregate type: %v", aggregateType)
}
}

// GetStatesForFilter returns a list of states as string if filter for state exists
// Will always return the states in the same order, irrespective of the ordering of the states in the filter
func GetStatesForFilter(filters []*model.Filter) []string {
var stateFilter *model.Filter
for _, f := range filters {
if f.Field == stateField {
stateFilter = f
}
}
allStates := util.Map(lookout.JobStates, func(jobState lookout.JobState) string { return string(jobState) })
if stateFilter == nil {
// If no state filter is specified, use all states
return allStates
}

switch stateFilter.Match {
case model.MatchExact:
return []string{fmt.Sprintf("%s", stateFilter.Value)}
case model.MatchAnyOf:
strSlice, err := toStringSlice(stateFilter.Value)
if err != nil {
return allStates
}
stateStringSet := util.StringListToSet(strSlice)
// Ensuring they are in the same order
var finalStates []string
for _, state := range allStates {
if _, ok := stateStringSet[state]; ok {
finalStates = append(finalStates, state)
}
}
return finalStates
default:
return allStates
}
}

func toStringSlice(val interface{}) ([]string, error) {
switch v := val.(type) {
case []string:
return v, nil
case []interface{}:
result := make([]string, len(v))
for i := 0; i < len(v); i++ {
str := fmt.Sprintf("%v", v[i])
result[i] = str
}
return result, nil
default:
return nil, errors.Errorf("failed to convert interface to string slice: %v of type %T", val, val)
}
}
122 changes: 122 additions & 0 deletions internal/lookoutv2/repository/fieldparser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package repository

import (
"fmt"
"math"
"time"

"github.com/jackc/pgtype"
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/database/lookout"
"github.com/armadaproject/armada/internal/lookoutv2/model"
)

type FieldParser interface {
GetField() string
GetVariableRef() interface{}
ParseValue() (interface{}, error)
}

type LastTransitionTimeParser struct {
variable pgtype.Numeric
}

func (fp *LastTransitionTimeParser) GetField() string {
return lastTransitionTimeField
}

func (fp *LastTransitionTimeParser) GetVariableRef() interface{} {
return &fp.variable
}

func (fp *LastTransitionTimeParser) ParseValue() (interface{}, error) {
var dst float64
err := fp.variable.AssignTo(&dst)
if err != nil {
return "", err
}
t := time.Unix(int64(math.Round(dst)), 0)
return t.Format(time.RFC3339), nil
}

type TimeParser struct {
field string
variable time.Time
}

func (fp *TimeParser) GetField() string {
return fp.field
}

func (fp *TimeParser) GetVariableRef() interface{} {
return &fp.variable
}

func (fp *TimeParser) ParseValue() (interface{}, error) {
return fp.variable.Format(time.RFC3339), nil
}

type StateParser struct {
variable int16
}

func (fp *StateParser) GetField() string {
return stateField
}

func (fp *StateParser) GetVariableRef() interface{} {
return &fp.variable
}

func (fp *StateParser) ParseValue() (interface{}, error) {
state, ok := lookout.JobStateMap[int(fp.variable)]
if !ok {
return "", errors.Errorf("state not found: %d", fp.variable)
}
return string(state), nil
}

type BasicParser[T any] struct {
field string
variable T
}

func (fp *BasicParser[T]) GetField() string {
return fp.field
}

func (fp *BasicParser[T]) GetVariableRef() interface{} {
return &fp.variable
}

func (fp *BasicParser[T]) ParseValue() (interface{}, error) {
return fp.variable, nil
}

func ParserForGroup(field string) FieldParser {
switch field {
case stateField:
return &StateParser{}
default:
return &BasicParser[string]{field: field}
}
}

func ParsersForAggregate(field string, filters []*model.Filter) ([]FieldParser, error) {
var parsers []FieldParser
switch field {
case lastTransitionTimeField:
parsers = append(parsers, &LastTransitionTimeParser{})
case submittedField:
parsers = append(parsers, &TimeParser{field: submittedField})
case stateField:
states := GetStatesForFilter(filters)
for _, state := range states {
parsers = append(parsers, &BasicParser[int]{field: fmt.Sprintf("%s%s", stateAggregatePrefix, state)})
}
default:
return nil, errors.Errorf("no aggregate found for field %s", field)
}
return parsers, nil
}
Loading

0 comments on commit 5bf48cb

Please sign in to comment.