Skip to content

Commit

Permalink
feat(query): more typing
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmalkmus committed Jul 17, 2023
1 parent 9a66d45 commit 0e1aa21
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 45 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ fmt: ## Format and simplify the source code using `gofmt`

.PHONY: generate
generate: \
axiom/query/aggregation_string.go \
axiom/querylegacy/aggregation_string.go \
axiom/querylegacy/filter_string.go \
axiom/querylegacy/kind_string.go \
Expand Down
57 changes: 50 additions & 7 deletions axiom/datasets_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *DatasetsTestSuite) Test() {
startTime := now.Add(-time.Minute)
endTime := now.Add(time.Minute)

// Run a simple APL query.
// Run a simple APL query...
apl := fmt.Sprintf("['%s']", s.dataset.ID)
queryResult, err := s.client.Datasets.Query(s.ctx, apl,
query.SetStartTime(startTime),
Expand All @@ -228,7 +228,7 @@ func (s *DatasetsTestSuite) Test() {
// FIXME(lukasmalkmus): Tabular results format is not yet returning
// the dataset name.
s.Equal("FIXME", table.Sources[0].Name)
// s.Equal(s.dataset.ID, table.Sources[0])
// s.Equal(s.dataset.ID, table.Sources[0].Name)
}

// FIXME(lukasmalkmus): Tabular results format is not yet returning the
Expand All @@ -239,6 +239,46 @@ func (s *DatasetsTestSuite) Test() {
// s.Len(table.Columns, 12) // 8 event fields + 1 label field + 3 system fields
}

// ... and a slightly more complex (analytic) APL query.
apl = fmt.Sprintf("['%s'] | summarize topk(remote_ip, 1)", s.dataset.ID)
queryResult, err = s.client.Datasets.Query(s.ctx, apl,
query.SetStartTime(startTime),
query.SetEndTime(endTime),
)
s.Require().NoError(err)
s.Require().NotNil(queryResult)

s.NotZero(queryResult.Status.ElapsedTime)
s.EqualValues(10, queryResult.Status.RowsExamined)
s.EqualValues(10, queryResult.Status.RowsMatched)
if s.Len(queryResult.Tables, 1) {
table := queryResult.Tables[0]

if s.Len(table.Sources, 1) {
// FIXME(lukasmalkmus): Tabular results format is not yet returning
// the dataset name.
s.Equal("FIXME", table.Sources[0].Name)
// s.Equal(s.dataset.ID, table.Sources[0].Name)
}

if s.Len(table.Fields, 1) && s.NotNil(table.Fields[0].Aggregation) {
agg := table.Fields[0].Aggregation

s.Equal(query.OpTopk, agg.Op)
s.Equal([]string{"remote_ip"}, agg.Fields)
s.Equal([]any{1.}, agg.Args)
}

if s.Len(table.Columns, 1) && s.Len(table.Columns[0], 1) {
v := table.Columns[0][0].([]any)
m := v[0].(map[string]any)

s.Equal("93.180.71.1", m["key"])
s.Equal(5., m["count"])
s.Equal(0., m["error"])
}
}

// Also run a legacy query and make sure we see some results.
legacyQueryResult, err := s.client.Datasets.QueryLegacy(s.ctx, s.dataset.ID, querylegacy.Query{
StartTime: startTime,
Expand Down Expand Up @@ -344,16 +384,15 @@ func (s *DatasetsTestSuite) TestCursor() {
)
s.Require().NoError(err)

// FIXME(lukasmalkmus): Tabular results format is not yet returning the
// _rowID column.
s.T().Skip()

// HINT(lukasmalkmus): Expecting four columns: _time, _sysTime, _rowID, foo.
// This is only checked once for the first query result to verify the
// dataset scheme. The following queries will only check the results in the
// columns.
// FIXME(lukasmalkmus): Tabular results format is not yet returning the
// _rowID column.
s.Require().Len(queryResult.Tables, 1)
s.Require().Len(queryResult.Tables[0].Columns, 4)
s.Require().Len(queryResult.Tables[0].Columns, 3)
// s.Require().Len(queryResult.Tables[0].Columns, 4)
s.Require().Len(queryResult.Tables[0].Columns[0], 3)

if s.Len(queryResult.Tables, 1) {
Expand All @@ -362,6 +401,10 @@ func (s *DatasetsTestSuite) TestCursor() {
s.Equal("bar", queryResult.Tables[0].Columns[2][2])
}

// FIXME(lukasmalkmus): Tabular results format is not yet returning the
// _rowID column.
s.T().Skip()

// HINT(lukasmalkmus): In a real-world scenario, the cursor would be
// retrieved from the query status MinCursor or MaxCursor fields, depending
// on the queries sort order.
Expand Down
26 changes: 14 additions & 12 deletions axiom/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,50 +198,50 @@ var (
Fields: []query.Field{
{
Name: "_time",
Type: "string",
Type: query.TypeString,
},
{
Name: "_sysTime",
Type: "string",
Type: query.TypeString,
},
{
Name: "_rowId",
Type: "string",
Type: query.TypeString,
},
{
Name: "agent",
Type: "string",
Type: query.TypeString,
},
{
Name: "bytes",
Type: "float64",
Type: query.TypeReal,
},
{
Name: "referrer",
Type: "string",
Type: query.TypeString,
},
{
Name: "remote_ip",
Type: "string",
Type: query.TypeString,
},
{
Name: "remote_user",
Type: "string",
Type: query.TypeString,
},
{
Name: "request",
Type: "string",
Type: query.TypeString,
},
{
Name: "response",
Type: "float64",
Type: query.TypeReal,
},
{
Name: "time",
Type: "string",
Type: query.TypeString,
},
},
Range: &query.RangeInfo{
Range: &query.Range{
Field: "_time",
Start: parseTimeOrPanic("2023-03-21T13:38:51.735448191Z"),
End: parseTimeOrPanic("2023-03-28T13:38:51.735448191Z"),
Expand Down Expand Up @@ -1091,6 +1091,8 @@ func TestDatasetsService_Query(t *testing.T) {
assert.Equal(t, expQueryRes, res)
}

// TODO(lukasmalkmus): Add test for a query with an aggregation.

func TestDatasetsService_QueryLegacy(t *testing.T) {
hf := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodPost, r.Method)
Expand Down
134 changes: 134 additions & 0 deletions axiom/query/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package query

import (
"encoding/json"
"fmt"
"strings"
)

//go:generate go run golang.org/x/tools/cmd/stringer -type=AggregationOp -linecomment -output=aggregation_string.go

// An AggregationOp describes the [Aggregation] operation applied on a [Field].
type AggregationOp uint8

// All available [Aggregation] operations.
const (
OpUnknown AggregationOp = iota // unknown

OpCount // count
OpCountIf // countif
OpDistinct // distinct
OpDistinctIf // distinctif
OpSum // sum
OpSumIf // sumif
OpAvg // avg
OpAvgIf // avgif
OpMin // min
OpMinIf // minif
OpMax // max
OpMaxIf // maxif
OpTopk // topk
OpPercentiles // percentiles
OpHistogram // histogram
OpStandardDeviation // stdev
OpStandardDeviationIf // stdevif
OpVariance // variance
OpVarianceIf // varianceif
OpArgMin // argmin
OpArgMax // argmax
OpRate // rate
OpPearson // pearson_correlation
OpMakeSet // makeset
OpMakeSetIf // makesetif
OpMakeList // makelist
OpMakeListIf // makelistif
)

func aggregationOpFromString(s string) (op AggregationOp, err error) {
switch strings.ToLower(s) {
case OpCount.String():
op = OpCount
case OpCountIf.String():
op = OpCountIf
case OpDistinct.String():
op = OpDistinct
case OpDistinctIf.String():
op = OpDistinctIf
case OpSum.String():
op = OpSum
case OpSumIf.String():
op = OpSumIf
case OpAvg.String():
op = OpAvg
case OpAvgIf.String():
op = OpAvgIf
case OpMin.String():
op = OpMin
case OpMinIf.String():
op = OpMinIf
case OpMax.String():
op = OpMax
case OpMaxIf.String():
op = OpMaxIf
case OpTopk.String():
op = OpTopk
case OpPercentiles.String():
op = OpPercentiles
case OpHistogram.String():
op = OpHistogram
case OpStandardDeviation.String():
op = OpStandardDeviation
case OpStandardDeviationIf.String():
op = OpStandardDeviationIf
case OpVariance.String():
op = OpVariance
case OpVarianceIf.String():
op = OpVarianceIf
case OpArgMin.String():
op = OpArgMin
case OpArgMax.String():
op = OpArgMax
case OpRate.String():
op = OpRate
case OpPearson.String():
op = OpPearson
case OpMakeSet.String():
op = OpMakeSet
case OpMakeSetIf.String():
op = OpMakeSetIf
case OpMakeList.String():
op = OpMakeList
case OpMakeListIf.String():
op = OpMakeListIf
default:
return OpUnknown, fmt.Errorf("unknown aggregation operation: %s", s)
}

return op, nil
}

// UnmarshalJSON implements [json.Unmarshaler]. It is in place to unmarshal the
// AggregationOp from the string representation the server returns.
func (op *AggregationOp) UnmarshalJSON(b []byte) (err error) {
var s string
if err = json.Unmarshal(b, &s); err != nil {
return err
}

*op, err = aggregationOpFromString(s)

return err
}

// Aggregation that is applied to a [Field] in a [Table].
type Aggregation struct {
// Op is the aggregation operation. If the aggregation is aliased, the alias
// is stored in the parent [Field.Name].
Op AggregationOp `json:"name"`
// Fields specifies the names of the fields this aggregation is computed on.
// E.g. ["players"] for "topk(players, 10)".
Fields []string `json:"fields"`
// Args are the non-field arguments of the aggregation, if any. E.g. "10"
// for "topk(players, 10)".
Args []any `json:"args"`
}
50 changes: 50 additions & 0 deletions axiom/query/aggregation_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0e1aa21

Please sign in to comment.