Skip to content

Commit

Permalink
feat(query): tabular result format
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmalkmus committed Apr 15, 2024
1 parent 2954af7 commit d1468d3
Show file tree
Hide file tree
Showing 8 changed files with 523 additions and 711 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ fmt: ## Format and simplify the source code using `gofmt`

.PHONY: generate
generate: \
axiom/query/result_string.go \
axiom/querylegacy/aggregation_string.go \
axiom/querylegacy/filter_string.go \
axiom/querylegacy/kind_string.go \
Expand Down
112 changes: 37 additions & 75 deletions axiom/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,42 +135,10 @@ type aplQueryRequest struct {
type aplQueryResponse struct {
query.Result

// HINT(lukasmalkmus): Ignore these fields as they are not relevant for the
// user and/or will change with the new query result format.
LegacyRequest struct {
StartTime any `json:"startTime"`
EndTime any `json:"endTime"`
Resolution any `json:"resolution"`
Aggregations any `json:"aggregations"`
Filter any `json:"filter"`
Order any `json:"order"`
Limit any `json:"limit"`
VirtualFields any `json:"virtualFields"`
Projections any `json:"project"`
Cursor any `json:"cursor"`
IncludeCursor any `json:"includeCursor"`
ContinuationToken any `json:"continuationToken"`

// HINT(lukasmalkmus): Preserve the legacy request's "groupBy"
// field for now. This is needed to properly render some results.
GroupBy []string `json:"groupBy"`
} `json:"request"`
FieldsMeta any `json:"fieldsMetaMap"`
}

// UnmarshalJSON implements [json.Unmarshaler]. It is in place to unmarshal the
// groupBy field of the legacy request that is part of the response into the
// actual [query.Result.GroupBy] field.
func (r *aplQueryResponse) UnmarshalJSON(b []byte) error {
type localResponse *aplQueryResponse

if err := json.Unmarshal(b, localResponse(r)); err != nil {
return err
}

r.GroupBy = r.LegacyRequest.GroupBy

return nil
Format any `json:"format"`
Request any `json:"request"`
DatasetNames any `json:"datasetNames"`
FieldsMetaMap any `json:"fieldsMetaMap"`
}

// DatasetsService handles communication with the dataset related operations of
Expand Down Expand Up @@ -373,7 +341,7 @@ func (s *DatasetsService) Ingest(ctx context.Context, id string, r io.Reader, ty
}
res.TraceID = resp.TraceID()

setIngestResultOnSpan(span, res)
setIngestStatusOnSpan(span, res)

return &res, nil
}
Expand Down Expand Up @@ -478,7 +446,7 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events []
}
res.TraceID = resp.TraceID()

setIngestResultOnSpan(span, res)
setIngestStatusOnSpan(span, res)

return &res, nil
}
Expand Down Expand Up @@ -535,7 +503,7 @@ func (s *DatasetsService) IngestChannel(ctx context.Context, id string, events <

var ingestStatus ingest.Status
defer func() {
setIngestResultOnSpan(span, ingestStatus)
setIngestStatusOnSpan(span, ingestStatus)
}()

flush := func() error {
Expand Down Expand Up @@ -608,7 +576,7 @@ func (s *DatasetsService) Query(ctx context.Context, apl string, options ...quer
queryParams := struct {
Format string `url:"format"`
}{
Format: "legacy", // Hardcode legacy APL format for now.
Format: "tabular", // Hardcode tabular result format for now.
}

path, err := url.JoinPath("/v1/datasets", "_apl")
Expand Down Expand Up @@ -636,7 +604,8 @@ func (s *DatasetsService) Query(ctx context.Context, apl string, options ...quer
}
res.TraceID = resp.TraceID()

setQueryResultOnSpan(span, res.Result)
setQueryStatusOnSpan(span, res.Result.Status)
span.SetAttributes(attribute.String("axiom.result.trace_id", res.TraceID))

return &res.Result, nil
}
Expand Down Expand Up @@ -688,7 +657,8 @@ func (s *DatasetsService) QueryLegacy(ctx context.Context, id string, q queryleg
res.SavedQueryID = resp.Header.Get("X-Axiom-History-Query-Id")
res.TraceID = resp.TraceID()

setLegacyQueryResultOnSpan(span, res.Result)
setLegacyQueryStatusOnSpan(span, res.Result.Status)
span.SetAttributes(attribute.String("axiom.result.trace_id", res.TraceID))

return &res.Result, nil
}
Expand Down Expand Up @@ -740,60 +710,52 @@ func DetectContentType(r io.Reader) (io.Reader, ContentType, error) {
return r, typ, nil
}

func setIngestResultOnSpan(span trace.Span, res ingest.Status) {
func setIngestStatusOnSpan(span trace.Span, status ingest.Status) {
if !span.IsRecording() {
return
}

span.SetAttributes(
attribute.String("axiom.result.trace_id", res.TraceID),
attribute.Int64("axiom.events.ingested", int64(res.Ingested)),
attribute.Int64("axiom.events.failed", int64(res.Failed)),
attribute.Int64("axiom.events.processed_bytes", int64(res.ProcessedBytes)),
attribute.String("axiom.result.trace_id", status.TraceID),
attribute.Int64("axiom.events.ingested", int64(status.Ingested)),
attribute.Int64("axiom.events.failed", int64(status.Failed)),
attribute.Int64("axiom.events.processed_bytes", int64(status.ProcessedBytes)),
)
}

//nolint:dupl // We need to support both query packages and their types.
func setQueryResultOnSpan(span trace.Span, res query.Result) {
func setQueryStatusOnSpan(span trace.Span, status query.Status) {
if !span.IsRecording() {
return
}

span.SetAttributes(
attribute.String("axiom.result.trace_id", res.TraceID),
attribute.String("axiom.result.status.elapsed_time", res.Status.ElapsedTime.String()),
attribute.Int64("axiom.result.status.blocks_examined", int64(res.Status.BlocksExamined)),
attribute.Int64("axiom.result.status.rows_examined", int64(res.Status.RowsExamined)),
attribute.Int64("axiom.result.status.rows_matched", int64(res.Status.RowsMatched)),
attribute.Int64("axiom.result.status.num_groups", int64(res.Status.NumGroups)),
attribute.Bool("axiom.result.status.is_partial", res.Status.IsPartial),
attribute.Bool("axiom.result.status.is_estimate", res.Status.IsEstimate),
attribute.String("axiom.result.status.min_block_time", res.Status.MinBlockTime.String()),
attribute.String("axiom.result.status.max_block_time", res.Status.MaxBlockTime.String()),
attribute.String("axiom.result.status.min_cursor", res.Status.MinCursor),
attribute.String("axiom.result.status.max_cursor", res.Status.MaxCursor),
attribute.String("axiom.result.status.min_cursor", status.MinCursor),
attribute.String("axiom.result.status.max_cursor", status.MaxCursor),
attribute.String("axiom.query.min_cursor", status.MinCursor),
attribute.String("axiom.query.max_cursor", status.MaxCursor),
attribute.String("axiom.query.elapsed_time", status.ElapsedTime.String()),
attribute.Int64("axiom.query.rows_examined", int64(status.RowsExamined)),
attribute.Int64("axiom.query.rows_matched", int64(status.RowsMatched)),
)
}

//nolint:dupl // We need to support both query packages and their types.
func setLegacyQueryResultOnSpan(span trace.Span, res querylegacy.Result) {
func setLegacyQueryStatusOnSpan(span trace.Span, status querylegacy.Status) {
if !span.IsRecording() {
return
}

span.SetAttributes(
attribute.String("axiom.result.trace_id", res.TraceID),
attribute.String("axiom.result.status.elapsed_time", res.Status.ElapsedTime.String()),
attribute.Int64("axiom.result.status.blocks_examined", int64(res.Status.BlocksExamined)),
attribute.Int64("axiom.result.status.rows_examined", int64(res.Status.RowsExamined)),
attribute.Int64("axiom.result.status.rows_matched", int64(res.Status.RowsMatched)),
attribute.Int64("axiom.result.status.num_groups", int64(res.Status.NumGroups)),
attribute.Bool("axiom.result.status.is_partial", res.Status.IsPartial),
attribute.Bool("axiom.result.status.is_estimate", res.Status.IsEstimate),
attribute.String("axiom.result.status.min_block_time", res.Status.MinBlockTime.String()),
attribute.String("axiom.result.status.max_block_time", res.Status.MaxBlockTime.String()),
attribute.String("axiom.result.status.min_cursor", res.Status.MinCursor),
attribute.String("axiom.result.status.max_cursor", res.Status.MaxCursor),
attribute.String("axiom.querylegacy.elapsed_time", status.ElapsedTime.String()),
attribute.Int64("axiom.querylegacy.blocks_examined", int64(status.BlocksExamined)),
attribute.Int64("axiom.querylegacy.rows_examined", int64(status.RowsExamined)),
attribute.Int64("axiom.querylegacy.rows_matched", int64(status.RowsMatched)),
attribute.Int64("axiom.querylegacy.num_groups", int64(status.NumGroups)),
attribute.Bool("axiom.querylegacy.is_partial", status.IsPartial),
attribute.Bool("axiom.querylegacy.is_estimate", status.IsEstimate),
attribute.String("axiom.querylegacy.min_block_time", status.MinBlockTime.String()),
attribute.String("axiom.querylegacy.max_block_time", status.MaxBlockTime.String()),
attribute.String("axiom.querylegacy.min_cursor", status.MinCursor),
attribute.String("axiom.querylegacy.max_cursor", status.MaxCursor),
)
}

Expand Down
112 changes: 90 additions & 22 deletions axiom/datasets_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,64 @@ func (s *DatasetsTestSuite) Test() {
s.Require().NoError(err)
s.Require().NotNil(queryResult)

s.NotZero(queryResult.Status.ElapsedTime)
s.EqualValues(14, queryResult.Status.RowsExamined)
s.EqualValues(14, queryResult.Status.RowsMatched)
s.Len(queryResult.Matches, 14)
if s.Len(queryResult.Tables, 1) {
table := queryResult.Tables[0]

if s.Len(table.Sources, 1) {
// FIXME(lukasmalkmus): Uncomment once there is consensus on the
// source name format.
// s.Equal(s.dataset.ID, table.Sources[0].Name)
}

// FIXME(lukasmalkmus): Tabular results format is not yet returning the
// _rowID column.
s.Len(table.Fields, 11) // 8 event fields + 1 label field + 2 system fields
s.Len(table.Columns, 11) // 8 event fields + 1 label field + 2 system fields
// s.Len(table.Fields, 12) // 8 event fields + 1 label field + 3 system fields
// s.Len(table.Columns, 12) // 8 event fields + 1 label field + 3 system fields
}

// ... and a slightly more complex (analytic) APL query.
apl = fmt.Sprintf("['%s'] | summarize topk(remote_ip, 1)", s.dataset.ID)
queryResult, err = s.client.Datasets.Query(s.ctx, apl,
query.SetStartTime(startTime),
query.SetEndTime(endTime),
)
s.Require().NoError(err)
s.Require().NotNil(queryResult)

s.NotZero(queryResult.Status.ElapsedTime)
s.EqualValues(14, queryResult.Status.RowsExamined)
s.EqualValues(14, queryResult.Status.RowsMatched)
if s.Len(queryResult.Tables, 1) {
table := queryResult.Tables[0]

if s.Len(table.Sources, 1) {
// FIXME(lukasmalkmus): Uncomment once there is consensus on the
// source name format.
// s.Equal(s.dataset.ID, table.Sources[0].Name)
}

if s.Len(table.Fields, 1) && s.NotNil(table.Fields[0].Aggregation) {
agg := table.Fields[0].Aggregation

s.Equal(query.OpTopk, agg.Op)
s.Equal([]string{"remote_ip"}, agg.Fields)
s.Equal([]any{1.}, agg.Args)
}

if s.Len(table.Columns, 1) && s.Len(table.Columns[0], 1) {
v := table.Columns[0][0].([]any)
m := v[0].(map[string]any)

s.Equal("93.180.71.1", m["key"])
s.Equal(7., m["count"])
s.Equal(0., m["error"])
}
}

// Also run a legacy query and make sure we see some results.
legacyQueryResult, err := s.client.Datasets.QueryLegacy(s.ctx, s.dataset.ID, querylegacy.Query{
Expand All @@ -258,6 +313,7 @@ func (s *DatasetsTestSuite) Test() {
s.Require().NoError(err)
s.Require().NotNil(legacyQueryResult)

s.NotZero(queryResult.Status.ElapsedTime)
s.EqualValues(14, legacyQueryResult.Status.RowsExamined)
s.EqualValues(14, legacyQueryResult.Status.RowsMatched)
s.Len(legacyQueryResult.Matches, 14)
Expand Down Expand Up @@ -335,16 +391,16 @@ func (s *DatasetsTestSuite) TestCursor() {
now := time.Now().Truncate(time.Second)
_, err := s.client.Datasets.IngestEvents(s.ctx, s.dataset.ID, []axiom.Event{
{ // Oldest
"_time": now.Add(-time.Second * 3),
"foo": "bar",
ingest.TimestampField: now.Add(-time.Second * 3),
"foo": "bar",
},
{
"_time": now.Add(-time.Second * 2),
"foo": "baz",
ingest.TimestampField: now.Add(-time.Second * 2),
"foo": "baz",
},
{ // Newest
"_time": now.Add(-time.Second * 1),
"foo": "buz",
ingest.TimestampField: now.Add(-time.Second * 1),
"foo": "buz",
},
})
s.Require().NoError(err)
Expand All @@ -360,16 +416,28 @@ func (s *DatasetsTestSuite) TestCursor() {
)
s.Require().NoError(err)

if s.Len(queryResult.Matches, 3) {
s.Equal("buz", queryResult.Matches[0].Data["foo"])
s.Equal("baz", queryResult.Matches[1].Data["foo"])
s.Equal("bar", queryResult.Matches[2].Data["foo"])
// FIXME(lukasmalkmus): Tabular results format is not yet returning the
// _rowID column.
s.T().Skip()

// HINT(lukasmalkmus): Expecting four columns: _time, _sysTime, _rowID, foo.
// This is only checked once for the first query result to verify the
// dataset scheme. The following queries will only check the results in the
// columns.
s.Require().Len(queryResult.Tables, 1)
s.Require().Len(queryResult.Tables[0].Columns, 4)
s.Require().Len(queryResult.Tables[0].Columns[0], 3)

if s.Len(queryResult.Tables, 1) {
s.Equal("buz", queryResult.Tables[0].Columns[2][0])
s.Equal("baz", queryResult.Tables[0].Columns[2][1])
s.Equal("bar", queryResult.Tables[0].Columns[2][2])
}

// HINT(lukasmalkmus): In a real-world scenario, the cursor would be
// retrieved from the query status MinCursor or MaxCursor fields, depending
// on the queries sort order.
midRowID := queryResult.Matches[1].RowID
midRowID := queryResult.Tables[0].Columns[0][2].(string)

// Query events with a cursor in descending order...
apl = fmt.Sprintf("['%s'] | sort by _time desc", s.dataset.ID)
Expand All @@ -382,8 +450,8 @@ func (s *DatasetsTestSuite) TestCursor() {

// "buz" and "baz" skipped by the cursor, only "bar" is returned. The cursor
// is exclusive, so "baz" is not included.
if s.Len(queryResult.Matches, 1) {
s.Equal("bar", queryResult.Matches[0].Data["foo"])
if s.Len(queryResult.Tables[0].Columns[0], 1) {
s.Equal("bar", queryResult.Tables[0].Columns[0][0])
}

// ...again, but with the cursor inclusive.
Expand All @@ -396,9 +464,9 @@ func (s *DatasetsTestSuite) TestCursor() {

// "buz" skipped by the cursor, only "baz" and "bar" is returned. The cursor
// is inclusive, so "baz" is included.
if s.Len(queryResult.Matches, 2) {
s.Equal("baz", queryResult.Matches[0].Data["foo"])
s.Equal("bar", queryResult.Matches[1].Data["foo"])
if s.Len(queryResult.Tables[0].Columns[0], 2) {
s.Equal("baz", queryResult.Tables[0].Columns[0][0])
s.Equal("bar", queryResult.Tables[0].Columns[0][1])
}

// Query events with a cursor in ascending order...
Expand All @@ -412,8 +480,8 @@ func (s *DatasetsTestSuite) TestCursor() {

// "bar" and "baz" skipped by the cursor, only "buz" is returned. The cursor
// is exclusive, so "baz" is not included.
if s.Len(queryResult.Matches, 1) {
s.Equal("buz", queryResult.Matches[0].Data["foo"])
if s.Len(queryResult.Tables[0].Columns[0], 1) {
s.Equal("buz", queryResult.Tables[0].Columns[0][0])
}

// ...again, but with the cursor inclusive.
Expand All @@ -426,9 +494,9 @@ func (s *DatasetsTestSuite) TestCursor() {

// "bar" skipped by the cursor, only "baz" and "buz" is returned. The cursor
// is inclusive, so "baz" is included.
if s.Len(queryResult.Matches, 2) {
s.Equal("baz", queryResult.Matches[0].Data["foo"])
s.Equal("buz", queryResult.Matches[1].Data["foo"])
if s.Len(queryResult.Tables[0].Columns[0], 2) {
s.Equal("baz", queryResult.Tables[0].Columns[0][0])
s.Equal("buz", queryResult.Tables[0].Columns[0][1])
}
}

Expand Down
Loading

0 comments on commit d1468d3

Please sign in to comment.