From 2b368be65be65423f8ca51cb4306998f59aa62f9 Mon Sep 17 00:00:00 2001 From: Lukas Malkmus Date: Wed, 25 Jan 2023 00:02:27 +0100 Subject: [PATCH] feat(query): iterator for columns and rows --- README.md | 17 +++-- axiom/query/iter/doc.go | 13 ++++ axiom/query/iter/iter.go | 95 ++++++++++++++++++++++++++ axiom/query/iter/iter_test.go | 124 ++++++++++++++++++++++++++++++++++ axiom/query/result.go | 15 ++++ axiom/query/row.go | 41 +++++++++++ axiom/query/row_test.go | 54 +++++++++++++++ examples/README.md | 2 +- examples/query/main.go | 19 ++++-- 9 files changed, 369 insertions(+), 11 deletions(-) create mode 100644 axiom/query/iter/doc.go create mode 100644 axiom/query/iter/iter.go create mode 100644 axiom/query/iter/iter_test.go create mode 100644 axiom/query/row.go create mode 100644 axiom/query/row_test.go diff --git a/README.md b/README.md index 1f679d8d..d21ded9f 100644 --- a/README.md +++ b/README.md @@ -87,22 +87,29 @@ func main() { client, err := axiom.NewClient() if err != nil { - log.Fatalln(err) + log.Fatal(err) } if _, err = client.IngestEvents(ctx, "my-dataset", []axiom.Event{ {ingest.TimestampField: time.Now(), "foo": "bar"}, {ingest.TimestampField: time.Now(), "bar": "foo"}, }); err != nil { - log.Fatalln(err) + log.Fatal(err) } res, err := client.Query(ctx, "['my-dataset'] | where foo == 'bar' | limit 100") if err != nil { - log.Fatalln(err) + log.Fatal(err) + } else if res.Status.RowsMatched == 0 { + log.Fatal("No matches found") } - for _, match := range res.Matches { - fmt.Println(match.Data) + + rows := res.Tables[0].Rows() + if err := rows.Range(ctx, func(_ context.Context, row query.Row) error { + _, err := fmt.Println(row) + return err + }); err != nil { + log.Fatal(err) } } ``` diff --git a/axiom/query/iter/doc.go b/axiom/query/iter/doc.go new file mode 100644 index 00000000..b2b20984 --- /dev/null +++ b/axiom/query/iter/doc.go @@ -0,0 +1,13 @@ +// Package iter provides a generic iterator implementation and helper functions +// to construct iterators from slices and ranges. +// +// To construct an [Iter], use the [Range] or [Slice] functions: +// +// // Construct an iterator that returns a, b and c on successive calls. +// slice := []string{"a", "b", "c"} +// itr := iter.Slice(slice, func(_ context.Context, item string) (string, error) { +// return item, nil +// }) +// +// An [Iter] always returns a [Done] error when it is exhausted. +package iter diff --git a/axiom/query/iter/iter.go b/axiom/query/iter/iter.go new file mode 100644 index 00000000..b0903fda --- /dev/null +++ b/axiom/query/iter/iter.go @@ -0,0 +1,95 @@ +package iter + +import ( + "context" + "errors" +) + +// Done is returned if the iterator does not contain any more elements. +// +//nolint:revive,stylecheck // No leading "Err" as "Done" is like [io.EOF]. +var Done = errors.New("no more elements in iterator") + +// Element is a type that can be iterated over. +type Element any + +// Iter is a function that returns the next element in the iterator. It returns +// the Done error if the iterator does not contain any more elements. +type Iter[T Element] func(context.Context) (T, error) + +// Range creates an iterator that executes the given function for each index in +// the specified range. +func Range[T Element](start, end int, f func(context.Context, int) (T, error)) Iter[T] { + var idx = start + return func(ctx context.Context) (t T, err error) { + if ctx.Err() != nil { + return t, ctx.Err() + } + if idx > end { + return t, Done + } + t, err = f(ctx, idx) + idx++ + return + } +} + +// Slice creates an iterator that executes the given function for each element +// in the slice. +func Slice[T Element](slice []T, f func(context.Context, T) (T, error)) Iter[T] { + var ( + idx = 0 + end = len(slice) - 1 + ) + return func(ctx context.Context) (t T, err error) { + if ctx.Err() != nil { + return t, ctx.Err() + } + if idx > end { + return t, Done + } + t, err = f(ctx, slice[idx]) + idx++ + return + } +} + +// Next returns the next [Element] in the iterator. +func (itr Iter[T]) Next(ctx context.Context) (T, error) { + return itr(ctx) +} + +// Take returns up to n elements from the iterator. The iterator is only +// guaranteed to return a slice of length n if the error is [nil]. +func (itr Iter[T]) Take(ctx context.Context, n int) ([]T, error) { + res := make([]T, n) + for i := 0; i < n; i++ { + if ctx.Err() != nil { + return res[:i], ctx.Err() + } + var err error + if res[i], err = itr.Next(ctx); err != nil { + return res[:i], err + } + } + return res, nil +} + +// Range executes the given function for each [Element] in the iterator until it +// is exhausted in which case it returns [nil] instead of [Done]. +func (itr Iter[T]) Range(ctx context.Context, f func(context.Context, T) error) error { + for { + if err := ctx.Err(); err != nil { + return err + } + t, err := itr.Next(ctx) + if err != nil { + if err == Done { + return nil + } + return err + } else if err := f(ctx, t); err != nil { + return err + } + } +} diff --git a/axiom/query/iter/iter_test.go b/axiom/query/iter/iter_test.go new file mode 100644 index 00000000..6b6c8ee8 --- /dev/null +++ b/axiom/query/iter/iter_test.go @@ -0,0 +1,124 @@ +package iter_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/axiomhq/axiom-go/axiom/query/iter" +) + +func TestRange(t *testing.T) { + itr := iter.Range(1, 2, func(_ context.Context, idx int) (int, error) { + return idx, nil + }) + + ctx := context.Background() + + res, err := itr.Next(ctx) + require.NoError(t, err) + + assert.Equal(t, 1, res) + + res, err = itr.Next(ctx) + require.NoError(t, err) + + assert.Equal(t, 2, res) + + res, err = itr.Next(ctx) + require.Error(t, err) + + assert.Equal(t, iter.Done, err) + assert.Zero(t, res) +} + +func TestSlice(t *testing.T) { + slice := []int{1, 2} + itr := iter.Slice(slice, func(_ context.Context, item int) (int, error) { + return item, nil + }) + + ctx := context.Background() + + res, err := itr.Next(ctx) + require.NoError(t, err) + + assert.Equal(t, 1, res) + + res, err = itr.Next(ctx) + require.NoError(t, err) + + assert.Equal(t, 2, res) + + res, err = itr.Next(ctx) + require.Error(t, err) + + assert.Equal(t, iter.Done, err) + assert.Zero(t, res) +} + +func TestIter_Next(t *testing.T) { + itr := iter.Iter[int](func(context.Context) (int, error) { + return 1, nil + }) + + ctx := context.Background() + + res1, _ := itr(ctx) + res2, _ := itr.Next(ctx) + + assert.Equal(t, res1, res2) +} + +func TestIter_Take(t *testing.T) { + itr := iter.Iter[int](func(context.Context) (int, error) { + return 1, nil + }) + + ctx := context.Background() + + res, _ := itr.Take(ctx, 3) + if assert.Len(t, res, 3) { + assert.Equal(t, []int{1, 1, 1}, res) + } +} + +func TestIter_Take_Error(t *testing.T) { + var count int + itr := iter.Iter[int](func(context.Context) (int, error) { + if count > 1 { + return 0, errors.New("an error") + } + count++ + return 1, nil + }) + + ctx := context.Background() + + res, err := itr.Take(ctx, 3) + + if assert.Error(t, err) { + assert.EqualError(t, err, "an error") + } + if assert.Len(t, res, 2) { + assert.Equal(t, []int{1, 1}, res) + } +} + +func TestIter_Range(t *testing.T) { + itr := iter.Range(1, 5, func(_ context.Context, idx int) (int, error) { + return idx, nil + }) + + var res int + err := itr.Range(context.Background(), func(_ context.Context, i int) error { + res += i + return nil + }) + require.NoError(t, err) + + assert.Equal(t, 15, res) +} diff --git a/axiom/query/result.go b/axiom/query/result.go index fdd2be31..9cdf8525 100644 --- a/axiom/query/result.go +++ b/axiom/query/result.go @@ -1,8 +1,11 @@ package query import ( + "context" "encoding/json" "time" + + "github.com/axiomhq/axiom-go/axiom/query/iter" ) // Result is the result of an APL query. @@ -43,6 +46,11 @@ type Table struct { Columns []Column `json:"columns"` } +// Rows returns an iterator over the rows build from the columns the table. +func (t Table) Rows() iter.Iter[Row] { + return Rows(t.Columns) +} + // Field in a [Table]. type Field struct { // Name of the field. @@ -110,6 +118,13 @@ type BucketInfo struct { // Column in a [Table] containing the raw values of a [Field]. type Column []any +// Values returns an iterator over the values of the column. +func (c Column) Values() iter.Iter[any] { + return iter.Slice(c, func(_ context.Context, v any) (any, error) { + return v, nil + }) +} + // Status of an APL query [Result]. type Status struct { // MinCursor is the id of the oldest row, as seen server side. May be lower diff --git a/axiom/query/row.go b/axiom/query/row.go new file mode 100644 index 00000000..207041f6 --- /dev/null +++ b/axiom/query/row.go @@ -0,0 +1,41 @@ +package query + +import ( + "context" + + "github.com/axiomhq/axiom-go/axiom/query/iter" +) + +// Row represents a single row of a tabular query [Result]. +type Row []any + +// Values returns an iterator over the values of the row. +func (r Row) Values() iter.Iter[any] { + return iter.Slice(r, func(_ context.Context, v any) (any, error) { + return v, nil + }) +} + +// Rows returns an iterator over the rows build from the columns of a tabular +// query [Result]. +func Rows(columns []Column) iter.Iter[Row] { + // Return an empty iterator if there are no columns or column values. + if len(columns) == 0 || len(columns[0]) == 0 { + return func(context.Context) (Row, error) { + return nil, iter.Done + } + } + + return iter.Range(0, len(columns[0]), func(_ context.Context, idx int) (Row, error) { + if idx >= len(columns[0]) { + return nil, iter.Done + } + + row := make(Row, len(columns)) + for columnIdx, column := range columns { + row[columnIdx] = column[idx] + } + + return row, nil + }) +} diff --git a/axiom/query/row_test.go b/axiom/query/row_test.go new file mode 100644 index 00000000..2e9c6a7a --- /dev/null +++ b/axiom/query/row_test.go @@ -0,0 +1,54 @@ +package query_test + +import ( + "context" + "fmt" + "log" + "strings" + + "github.com/axiomhq/axiom-go/axiom/query" + "github.com/axiomhq/axiom-go/axiom/query/iter" +) + +func ExampleRows() { + columns := []query.Column{ + []any{ + "2020-11-19T11:06:31.569475746Z", + "2020-11-19T11:06:31.569479846Z", + }, + []any{ + "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)", + "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)", + }, + []any{ + "93.180.71.3", + "93.180.71.3", + }, + []any{ + "GET /downloads/product_1 HTTP/1.1", + "GET /downloads/product_1 HTTP/1.1", + }, + []any{ + 304, + 304, + }, + } + + rows := query.Rows(columns) + buf := new(strings.Builder) + + for { + row, err := rows.Next(context.Background()) + if err == iter.Done { + break + } else if err != nil { + log.Fatal(err) + } + fmt.Fprintln(buf, row) + } + + // Output: + // [2020-11-19T11:06:31.569475746Z Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21) 93.180.71.3 GET /downloads/product_1 HTTP/1.1 304] + // [2020-11-19T11:06:31.569479846Z Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21) 93.180.71.3 GET /downloads/product_1 HTTP/1.1 304] + fmt.Print(buf.String()) +} diff --git a/examples/README.md b/examples/README.md index da0c8146..5b0f942d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -12,7 +12,7 @@ go run ./{example} Axiom Go and the adapters automatically pick up their configuration from the environment, if not otherwise specified. To learn more about configuration, check the -[documentation](https://pkg.go.dev/github.com/axiomhq/axiom-go/adapters). +[documentation](https://pkg.go.dev/github.com/axiomhq/axiom-go). To quickstart, export the environment variables below. diff --git a/examples/query/main.go b/examples/query/main.go index 807f64b5..a9f5fb29 100644 --- a/examples/query/main.go +++ b/examples/query/main.go @@ -9,6 +9,7 @@ import ( "os" "github.com/axiomhq/axiom-go/axiom" + "github.com/axiomhq/axiom-go/axiom/query" ) func main() { @@ -19,6 +20,8 @@ func main() { log.Fatal("AXIOM_DATASET is required") } + ctx := context.Background() + // 1. Initialize the Axiom API client. client, err := axiom.NewClient() if err != nil { @@ -27,15 +30,21 @@ func main() { // 2. Query all events using APL ⚡ apl := fmt.Sprintf("['%s']", dataset) // E.g. ['test'] - res, err := client.Query(context.Background(), apl) + res, err := client.Query(ctx, apl) if err != nil { log.Fatal(err) - } else if len(res.Matches) == 0 { + } else if res.Status.RowsMatched == 0 { log.Fatal("No matches found") } - // 3. Print the queried results. - for _, match := range res.Matches { - fmt.Println(match.Data) + // 3. Print the queried results by creating a iterator for the rows from the + // tabular query result (as it is organized in columns) and iterating over + // the rows. + rows := res.Tables[0].Rows() + if err := rows.Range(ctx, func(_ context.Context, row query.Row) error { + _, err := fmt.Println(row) + return err + }); err != nil { + log.Fatal(err) } }