Skip to content

Commit

Permalink
feat(query): iterator for columns and rows
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmalkmus committed Apr 15, 2024
1 parent d1468d3 commit 2b368be
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 11 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,29 @@ func main() {

client, err := axiom.NewClient()
if err != nil {
log.Fatalln(err)
log.Fatal(err)
}

if _, err = client.IngestEvents(ctx, "my-dataset", []axiom.Event{
{ingest.TimestampField: time.Now(), "foo": "bar"},
{ingest.TimestampField: time.Now(), "bar": "foo"},
}); err != nil {
log.Fatalln(err)
log.Fatal(err)
}

res, err := client.Query(ctx, "['my-dataset'] | where foo == 'bar' | limit 100")
if err != nil {
log.Fatalln(err)
log.Fatal(err)
} else if res.Status.RowsMatched == 0 {
log.Fatal("No matches found")
}
for _, match := range res.Matches {
fmt.Println(match.Data)

rows := res.Tables[0].Rows()
if err := rows.Range(ctx, func(_ context.Context, row query.Row) error {
_, err := fmt.Println(row)
return err
}); err != nil {
log.Fatal(err)
}
}
```
Expand Down
13 changes: 13 additions & 0 deletions axiom/query/iter/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Package iter provides a generic iterator implementation and helper functions
// to construct iterators from slices and ranges.
//
// To construct an [Iter], use the [Range] or [Slice] functions:
//
// // Construct an iterator that returns a, b and c on successive calls.
// slice := []string{"a", "b", "c"}
// itr := iter.Slice(slice, func(_ context.Context, item string) (string, error) {
// return item, nil
// })
//
// An [Iter] always returns a [Done] error when it is exhausted.
package iter
95 changes: 95 additions & 0 deletions axiom/query/iter/iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package iter

import (
"context"
"errors"
)

// Done is returned if the iterator does not contain any more elements.
//
//nolint:revive,stylecheck // No leading "Err" as "Done" is like [io.EOF].
var Done = errors.New("no more elements in iterator")

// Element is a type that can be iterated over.
type Element any

// Iter is a function that returns the next element in the iterator. It returns
// the Done error if the iterator does not contain any more elements.
type Iter[T Element] func(context.Context) (T, error)

// Range creates an iterator that executes the given function for each index in
// the specified range.
func Range[T Element](start, end int, f func(context.Context, int) (T, error)) Iter[T] {
var idx = start
return func(ctx context.Context) (t T, err error) {
if ctx.Err() != nil {
return t, ctx.Err()
}
if idx > end {
return t, Done
}
t, err = f(ctx, idx)
idx++
return
}
}

// Slice creates an iterator that executes the given function for each element
// in the slice.
func Slice[T Element](slice []T, f func(context.Context, T) (T, error)) Iter[T] {
var (
idx = 0
end = len(slice) - 1
)
return func(ctx context.Context) (t T, err error) {
if ctx.Err() != nil {
return t, ctx.Err()
}
if idx > end {
return t, Done
}
t, err = f(ctx, slice[idx])
idx++
return
}
}

// Next returns the next [Element] in the iterator.
func (itr Iter[T]) Next(ctx context.Context) (T, error) {
return itr(ctx)
}

// Take returns up to n elements from the iterator. The iterator is only
// guaranteed to return a slice of length n if the error is [nil].
func (itr Iter[T]) Take(ctx context.Context, n int) ([]T, error) {
res := make([]T, n)
for i := 0; i < n; i++ {
if ctx.Err() != nil {
return res[:i], ctx.Err()
}
var err error
if res[i], err = itr.Next(ctx); err != nil {
return res[:i], err
}
}
return res, nil
}

// Range executes the given function for each [Element] in the iterator until it
// is exhausted in which case it returns [nil] instead of [Done].
func (itr Iter[T]) Range(ctx context.Context, f func(context.Context, T) error) error {
for {
if err := ctx.Err(); err != nil {
return err
}
t, err := itr.Next(ctx)
if err != nil {
if err == Done {
return nil
}
return err
} else if err := f(ctx, t); err != nil {
return err
}
}
}
124 changes: 124 additions & 0 deletions axiom/query/iter/iter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package iter_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/axiomhq/axiom-go/axiom/query/iter"
)

func TestRange(t *testing.T) {
itr := iter.Range(1, 2, func(_ context.Context, idx int) (int, error) {
return idx, nil
})

ctx := context.Background()

res, err := itr.Next(ctx)
require.NoError(t, err)

assert.Equal(t, 1, res)

res, err = itr.Next(ctx)
require.NoError(t, err)

assert.Equal(t, 2, res)

res, err = itr.Next(ctx)
require.Error(t, err)

assert.Equal(t, iter.Done, err)
assert.Zero(t, res)
}

func TestSlice(t *testing.T) {
slice := []int{1, 2}
itr := iter.Slice(slice, func(_ context.Context, item int) (int, error) {
return item, nil
})

ctx := context.Background()

res, err := itr.Next(ctx)
require.NoError(t, err)

assert.Equal(t, 1, res)

res, err = itr.Next(ctx)
require.NoError(t, err)

assert.Equal(t, 2, res)

res, err = itr.Next(ctx)
require.Error(t, err)

assert.Equal(t, iter.Done, err)
assert.Zero(t, res)
}

func TestIter_Next(t *testing.T) {
itr := iter.Iter[int](func(context.Context) (int, error) {
return 1, nil
})

ctx := context.Background()

res1, _ := itr(ctx)
res2, _ := itr.Next(ctx)

assert.Equal(t, res1, res2)
}

func TestIter_Take(t *testing.T) {
itr := iter.Iter[int](func(context.Context) (int, error) {
return 1, nil
})

ctx := context.Background()

res, _ := itr.Take(ctx, 3)
if assert.Len(t, res, 3) {
assert.Equal(t, []int{1, 1, 1}, res)
}
}

func TestIter_Take_Error(t *testing.T) {
var count int
itr := iter.Iter[int](func(context.Context) (int, error) {
if count > 1 {
return 0, errors.New("an error")
}
count++
return 1, nil
})

ctx := context.Background()

res, err := itr.Take(ctx, 3)

if assert.Error(t, err) {
assert.EqualError(t, err, "an error")
}
if assert.Len(t, res, 2) {
assert.Equal(t, []int{1, 1}, res)
}
}

func TestIter_Range(t *testing.T) {
itr := iter.Range(1, 5, func(_ context.Context, idx int) (int, error) {
return idx, nil
})

var res int
err := itr.Range(context.Background(), func(_ context.Context, i int) error {
res += i
return nil
})
require.NoError(t, err)

assert.Equal(t, 15, res)
}
15 changes: 15 additions & 0 deletions axiom/query/result.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package query

import (
"context"
"encoding/json"
"time"

"github.com/axiomhq/axiom-go/axiom/query/iter"
)

// Result is the result of an APL query.
Expand Down Expand Up @@ -43,6 +46,11 @@ type Table struct {
Columns []Column `json:"columns"`
}

// Rows returns an iterator over the rows build from the columns the table.
func (t Table) Rows() iter.Iter[Row] {
return Rows(t.Columns)
}

// Field in a [Table].
type Field struct {
// Name of the field.
Expand Down Expand Up @@ -110,6 +118,13 @@ type BucketInfo struct {
// Column in a [Table] containing the raw values of a [Field].
type Column []any

// Values returns an iterator over the values of the column.
func (c Column) Values() iter.Iter[any] {
return iter.Slice(c, func(_ context.Context, v any) (any, error) {
return v, nil
})
}

// Status of an APL query [Result].
type Status struct {
// MinCursor is the id of the oldest row, as seen server side. May be lower
Expand Down
41 changes: 41 additions & 0 deletions axiom/query/row.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package query

import (
"context"

"github.com/axiomhq/axiom-go/axiom/query/iter"
)

// Row represents a single row of a tabular query [Result].
type Row []any

// Values returns an iterator over the values of the row.
func (r Row) Values() iter.Iter[any] {
return iter.Slice(r, func(_ context.Context, v any) (any, error) {
return v, nil
})
}

// Rows returns an iterator over the rows build from the columns of a tabular
// query [Result].
func Rows(columns []Column) iter.Iter[Row] {
// Return an empty iterator if there are no columns or column values.
if len(columns) == 0 || len(columns[0]) == 0 {
return func(context.Context) (Row, error) {
return nil, iter.Done
}
}

return iter.Range(0, len(columns[0]), func(_ context.Context, idx int) (Row, error) {
if idx >= len(columns[0]) {
return nil, iter.Done
}

row := make(Row, len(columns))
for columnIdx, column := range columns {
row[columnIdx] = column[idx]
}

return row, nil
})
}
Loading

0 comments on commit 2b368be

Please sign in to comment.