diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 53e602db..a794e3de 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -112,6 +112,7 @@ jobs: AXIOM_URL: ${{ secrets[format('TESTING_{0}_API_URL', matrix.slug)] }} AXIOM_TOKEN: ${{ secrets[format('TESTING_{0}_TOKEN', matrix.slug)] }} AXIOM_ORG_ID: ${{ secrets[format('TESTING_{0}_ORG_ID', matrix.slug)] }} + AXIOM_SIGNING_KEY: ${{ secrets[format('TESTING_{0}_SHARED_ACCESS_SIGNING_KEY', matrix.slug)] }} AXIOM_DATASET_SUFFIX: ${{ github.run_id }}-${{ matrix.go }} TELEMETRY_TRACES_URL: ${{ secrets.TELEMETRY_TRACES_URL }} TELEMETRY_TRACES_TOKEN: ${{ secrets.TELEMETRY_TRACES_TOKEN }} @@ -126,7 +127,7 @@ jobs: - name: Cleanup (On Test Failure) if: failure() run: | - curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/tags/v0.10.0 | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" + curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/latest | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" axiom dataset list -f=json | jq '.[] | select(.id | contains("${{ github.run_id }}-${{ matrix.go }}")).id' | xargs -r -n1 axiom dataset delete -f ci-pass: diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index 58c1e287..03fc355f 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -78,6 +78,7 @@ jobs: AXIOM_URL: ${{ secrets[format('TESTING_{0}_API_URL', matrix.slug)] }} AXIOM_TOKEN: ${{ secrets[format('TESTING_{0}_TOKEN', matrix.slug)] }} AXIOM_ORG_ID: ${{ secrets[format('TESTING_{0}_ORG_ID', matrix.slug)] }} + AXIOM_SIGNING_KEY: ${{ secrets[format('TESTING_{0}_SHARED_ACCESS_SIGNING_KEY', matrix.slug)] }} AXIOM_DATASET_SUFFIX: ${{ github.run_id }}-${{ matrix.go }} TELEMETRY_TRACES_URL: ${{ secrets.TELEMETRY_TRACES_URL }} TELEMETRY_TRACES_TOKEN: ${{ secrets.TELEMETRY_TRACES_TOKEN }} @@ -91,5 +92,5 @@ jobs: - name: Cleanup (On Test Failure) if: failure() run: | - curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/tags/v0.10.0 | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" + curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/latest | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" axiom dataset list -f=json | jq '.[] | select(.id | contains("${{ github.run_id }}-${{ matrix.go }}")).id' | xargs -r -n1 axiom dataset delete -f diff --git a/.github/workflows/server_regression.yaml b/.github/workflows/server_regression.yaml index b093c4fa..395c9e9b 100644 --- a/.github/workflows/server_regression.yaml +++ b/.github/workflows/server_regression.yaml @@ -38,6 +38,7 @@ jobs: AXIOM_URL: ${{ secrets[format('TESTING_{0}_API_URL', matrix.slug)] }} AXIOM_TOKEN: ${{ secrets[format('TESTING_{0}_TOKEN', matrix.slug)] }} AXIOM_ORG_ID: ${{ secrets[format('TESTING_{0}_ORG_ID', matrix.slug)] }} + AXIOM_SIGNING_KEY: ${{ secrets[format('TESTING_{0}_SHARED_ACCESS_SIGNING_KEY', matrix.slug)] }} AXIOM_DATASET_SUFFIX: ${{ github.run_id }}-${{ matrix.go }} TELEMETRY_TRACES_URL: ${{ secrets.TELEMETRY_TRACES_URL }} TELEMETRY_TRACES_TOKEN: ${{ secrets.TELEMETRY_TRACES_TOKEN }} @@ -51,5 +52,5 @@ jobs: - name: Cleanup (On Test Failure) if: failure() run: | - curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/tags/v0.10.0 | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" + curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/latest | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" axiom dataset list -f=json | jq '.[] | select(.id | contains("${{ github.run_id }}-${{ matrix.go }}")).id' | xargs -r -n1 axiom dataset delete -f diff --git a/.github/workflows/test_examples.yaml b/.github/workflows/test_examples.yaml index 1c7f1299..690c6c29 100644 --- a/.github/workflows/test_examples.yaml +++ b/.github/workflows/test_examples.yaml @@ -40,6 +40,7 @@ jobs: - oteltraces - query - querylegacy + - sas - slog - zap - zerolog @@ -87,6 +88,7 @@ jobs: AXIOM_URL: ${{ secrets.TESTING_STAGING_API_URL }} AXIOM_TOKEN: ${{ secrets.TESTING_STAGING_TOKEN }} AXIOM_ORG_ID: ${{ secrets.TESTING_STAGING_ORG_ID }} + AXIOM_SIGNING_KEY: ${{ secrets.TESTING_STAGING_SHARED_ACCESS_SIGNING_KEY }} AXIOM_DATASET: test-axiom-go-examples-${{ github.run_id }}-${{ matrix.example }} steps: - uses: actions/checkout@v4 @@ -95,7 +97,7 @@ jobs: go-version-file: go.mod - name: Setup test dataset run: | - curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/tags/v0.10.0 | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" + curl -sL $(curl -s https://api.github.com/repos/axiomhq/cli/releases/latest | grep "http.*linux_amd64.tar.gz" | awk '{print $2}' | sed 's|[\"\,]*||g') | tar xzvf - --strip-components=1 --wildcards -C /usr/local/bin "axiom_*_linux_amd64/axiom" axiom dataset create -n=$AXIOM_DATASET -d="Axiom Go ${{ matrix.example }} example test" - name: Setup example if: matrix.setup diff --git a/adapters/zap/zap_integration_test.go b/adapters/zap/zap_integration_test.go index 0ae4f72f..e4e1b5da 100644 --- a/adapters/zap/zap_integration_test.go +++ b/adapters/zap/zap_integration_test.go @@ -22,10 +22,7 @@ func Test(t *testing.T) { require.NoError(t, err) logger := zap.New(core) - defer func() { - err := logger.Sync() - assert.NoError(t, err) - }() + defer func() { assert.NoError(t, logger.Sync()) }() logger.Info("This is awesome!", zap.String("mood", "hyped")) logger.Warn("This is no that awesome...", zap.String("mood", "worried")) diff --git a/axiom/client.go b/axiom/client.go index 0a7de28c..4a804952 100644 --- a/axiom/client.go +++ b/axiom/client.go @@ -48,7 +48,7 @@ const ( otelTracerName = "github.com/axiomhq/axiom-go/axiom" ) -var validOnlyAPITokenPaths = regexp.MustCompile(`^/v1/datasets/([^/]+/(ingest|query)|_apl)(\?.+)?$`) +var validAPITokenPaths = regexp.MustCompile(`^/v1/datasets/([^/]+/(ingest|query)|_apl)(\?.+)?$`) // service is the base service used by all Axiom API services. type service struct { @@ -194,7 +194,7 @@ func (c *Client) NewRequest(ctx context.Context, method, path string, body any) } endpoint := c.config.BaseURL().ResolveReference(rel) - if config.IsAPIToken(c.config.Token()) && !validOnlyAPITokenPaths.MatchString(endpoint.Path) { + if config.IsAPIToken(c.config.Token()) && !validAPITokenPaths.MatchString(endpoint.Path) { return nil, ErrUnprivilegedToken } diff --git a/axiom/client_test.go b/axiom/client_test.go index 97d9aa80..a566d8b7 100644 --- a/axiom/client_test.go +++ b/axiom/client_test.go @@ -692,7 +692,7 @@ func TestAPITokenPathRegex(t *testing.T) { } for _, tt := range tests { t.Run(tt.input, func(t *testing.T) { - assert.Equal(t, tt.match, validOnlyAPITokenPaths.MatchString(tt.input)) + assert.Equal(t, tt.match, validAPITokenPaths.MatchString(tt.input)) }) } } diff --git a/axiom/datasets.go b/axiom/datasets.go index 47240ecf..6046ccaa 100644 --- a/axiom/datasets.go +++ b/axiom/datasets.go @@ -260,7 +260,7 @@ func (s *DatasetsService) Delete(ctx context.Context, id string) error { )) defer span.End() - path, err := url.JoinPath(s.basePath, "/", id) + path, err := url.JoinPath(s.basePath, id) if err != nil { return spanError(span, err) } @@ -533,9 +533,7 @@ func (s *DatasetsService) IngestChannel(ctx context.Context, id string, events < defer t.Stop() var ingestStatus ingest.Status - defer func() { - setIngestResultOnSpan(span, ingestStatus) - }() + defer func() { setIngestResultOnSpan(span, ingestStatus) }() flush := func() error { if len(batch) == 0 { @@ -556,7 +554,6 @@ func (s *DatasetsService) IngestChannel(ctx context.Context, id string, events < for { select { case <-ctx.Done(): - return &ingestStatus, spanError(span, context.Cause(ctx)) case event, ok := <-events: if !ok { @@ -596,9 +593,10 @@ func (s *DatasetsService) Query(ctx context.Context, apl string, options ...quer ctx, span := s.client.trace(ctx, "Datasets.Query", trace.WithAttributes( attribute.String("axiom.param.apl", apl), - attribute.String("axiom.param.start_time", opts.StartTime.String()), - attribute.String("axiom.param.end_time", opts.EndTime.String()), + attribute.String("axiom.param.start_time", opts.StartTime), + attribute.String("axiom.param.end_time", opts.EndTime), attribute.String("axiom.param.cursor", opts.Cursor), + attribute.Bool("axiom.param.include_cursor", opts.IncludeCursor), )) defer span.End() @@ -648,7 +646,10 @@ func (s *DatasetsService) Query(ctx context.Context, apl string, options ...quer // the future. Use [DatasetsService.Query] instead. func (s *DatasetsService) QueryLegacy(ctx context.Context, id string, q querylegacy.Query, opts querylegacy.Options) (*querylegacy.Result, error) { ctx, span := s.client.trace(ctx, "Datasets.QueryLegacy", trace.WithAttributes( - attribute.String("axiom.dataset_id", id), + attribute.String("axiom.param.dataset_id", id), + attribute.String("axiom.param.streaming_duration", opts.StreamingDuration.String()), + attribute.Bool("axiom.param.no_cache", opts.NoCache), + attribute.String("axiom.param.save_kind", opts.SaveKind.String()), )) defer span.End() diff --git a/axiom/doc.go b/axiom/doc.go index f822446a..bae2a356 100644 --- a/axiom/doc.go +++ b/axiom/doc.go @@ -7,6 +7,7 @@ // import "github.com/axiomhq/axiom-go/axiom/otel" // When using OpenTelemetry // import "github.com/axiomhq/axiom-go/axiom/query" // When constructing APL queries // import "github.com/axiomhq/axiom-go/axiom/querylegacy" // When constructing legacy queries +// import "github.com/axiomhq/axiom-go/axiom/sas" // When using shared access // // Construct a new Axiom client, then use the various services on the client to // access different parts of the Axiom API. The package automatically takes its diff --git a/axiom/encoder_test.go b/axiom/encoder_test.go index e834aeed..bc60fd39 100644 --- a/axiom/encoder_test.go +++ b/axiom/encoder_test.go @@ -24,10 +24,7 @@ func TestGzipEncoder(t *testing.T) { gzr, err := gzip.NewReader(r) require.NoError(t, err) - defer func() { - closeErr := gzr.Close() - require.NoError(t, closeErr) - }() + defer func() { require.NoError(t, gzr.Close()) }() act, err := io.ReadAll(gzr) require.NoError(t, err) diff --git a/axiom/limit.go b/axiom/limit.go index 6d2936fa..a66f75be 100644 --- a/axiom/limit.go +++ b/axiom/limit.go @@ -69,12 +69,12 @@ func limitScopeFromString(s string) (ls LimitScope, err error) { type Limit struct { // Scope a limit is enforced for. Only present on rate limited requests. Scope LimitScope - // The maximum limit a client is limited to for a specified time window + // The maximum limit a client is limited to for a specified time range // which resets at the time indicated by [Limit.Reset]. Limit uint64 // The remaining count towards the maximum limit. Remaining uint64 - // The time at which the current limit time window will reset. + // The time at which the current limit time range will reset. Reset time.Time limitType limitType diff --git a/axiom/orgs.go b/axiom/orgs.go index a827edaf..81921898 100644 --- a/axiom/orgs.go +++ b/axiom/orgs.go @@ -256,7 +256,7 @@ func (s *OrganizationsService) List(ctx context.Context) ([]*Organization, error // Get an organization by id. func (s *OrganizationsService) Get(ctx context.Context, id string) (*Organization, error) { ctx, span := s.client.trace(ctx, "Organizations.Get", trace.WithAttributes( - attribute.String("axiom.dataset_id", id), + attribute.String("axiom.organization_id", id), )) defer span.End() diff --git a/axiom/query/options.go b/axiom/query/options.go index 242bbc13..c6aae76f 100644 --- a/axiom/query/options.go +++ b/axiom/query/options.go @@ -5,9 +5,9 @@ import "time" // Options specifies the optional parameters for a query. type Options struct { // StartTime for the interval to query. - StartTime time.Time `json:"startTime,omitempty"` + StartTime string `json:"startTime,omitempty"` // EndTime of the interval to query. - EndTime time.Time `json:"endTime,omitempty"` + EndTime string `json:"endTime,omitempty"` // Cursor to use for pagination. When used, don't specify new start and end // times but rather use the start and end times of the query that returned // the cursor that will be used. @@ -27,15 +27,15 @@ type Option func(*Options) // SetStartTime specifies the start time of the query interval. When also using // [SetCursor], please make sure to use the start time of the query that // returned the cursor that will be used. -func SetStartTime(startTime time.Time) Option { - return func(o *Options) { o.StartTime = startTime } +func SetStartTime[T time.Time | string](startTime T) Option { + return func(o *Options) { o.StartTime = timeOrStringToString(startTime) } } // SetEndTime specifies the end time of the query interval. When also using // [SetCursor], please make sure to use the end time of the query that returned // the cursor that will be used. -func SetEndTime(endTime time.Time) Option { - return func(o *Options) { o.EndTime = endTime } +func SetEndTime[T time.Time | string](endTime T) Option { + return func(o *Options) { o.EndTime = timeOrStringToString(endTime) } } // SetCursor specifies the cursor of the query. If include is set to true the @@ -65,3 +65,13 @@ func SetVariable(name string, value any) Option { func SetVariables(variables map[string]any) Option { return func(o *Options) { o.Variables = variables } } + +func timeOrStringToString[T time.Time | string](t T) string { + switch t := any(t).(type) { + case time.Time: + return t.Format(time.RFC3339Nano) + case string: + return t + } + panic("time is neither time.Time nor string") +} diff --git a/axiom/query/options_test.go b/axiom/query/options_test.go index 4205a4b2..a6dcb64d 100644 --- a/axiom/query/options_test.go +++ b/axiom/query/options_test.go @@ -20,19 +20,37 @@ func TestOptions(t *testing.T) { { name: "set start time", options: []query.Option{ - query.SetStartTime(now), + query.SetStartTime(now.String()), }, want: query.Options{ - StartTime: now, + StartTime: now.String(), }, }, { name: "set end time", options: []query.Option{ - query.SetEndTime(now), + query.SetEndTime(now.String()), }, want: query.Options{ - EndTime: now, + EndTime: now.String(), + }, + }, + { + name: "set start time (apl)", + options: []query.Option{ + query.SetStartTime("now"), + }, + want: query.Options{ + StartTime: "now", + }, + }, + { + name: "set end time (apl)", + options: []query.Option{ + query.SetEndTime("now"), + }, + want: query.Options{ + EndTime: "now", }, }, { diff --git a/axiom/sas/doc.go b/axiom/sas/doc.go new file mode 100644 index 00000000..03069062 --- /dev/null +++ b/axiom/sas/doc.go @@ -0,0 +1,16 @@ +// Package sas implements functionality for creating and verifying shared access +// signatures (SAS) and shared access tokens (SAT) as well as using them to +// query Axiom datasets. A SAS grants querying capabilities to a dataset for a +// given time range and with a global filter applied on behalf of an +// organization. A SAS is a URL query string composed of a set of query +// parameters that make up the payload for a signature and the cryptographic +// signature itself, the SAT. +// +// Usage: +// +// import "github.com/axiomhq/axiom-go/axiom/sas" +// +// To create a SAS, that can be attached to a query request, use the +// high-level [Create] function. The returned [Options] can be attached to a +// [http.Request] or encoded to a query string by calling [Options.Encode]. +package sas diff --git a/axiom/sas/options.go b/axiom/sas/options.go new file mode 100644 index 00000000..8a6816ec --- /dev/null +++ b/axiom/sas/options.go @@ -0,0 +1,84 @@ +package sas + +import ( + "errors" + "net/http" + "net/url" + + "github.com/google/go-querystring/query" +) + +// The parameter names for the shared access signature query string. +const ( + queryOrgID = "oi" + queryDataset = "dt" + queryFilter = "fl" + queryMinStartTime = "mst" + queryMaxEndTime = "met" + queryExpiryTime = "exp" + queryToken = "tk" +) + +// Options are the url query parameters used to authenticate a query request. +type Options struct { + Params + + // Token is the signature created from the other fields in the options. + Token string `url:"tk"` +} + +// Decode the given signature into a set of options. +func Decode(signature string) (Options, error) { + q, err := url.ParseQuery(signature) + if err != nil { + return Options{}, err + } + + options := Options{ + Params: Params{ + OrganizationID: q.Get(queryOrgID), + Dataset: q.Get(queryDataset), + Filter: q.Get(queryFilter), + MinStartTime: q.Get(queryMinStartTime), + MaxEndTime: q.Get(queryMaxEndTime), + ExpiryTime: q.Get(queryExpiryTime), + }, + Token: q.Get(queryToken), + } + + // Validate that the params are valid and the token is present. + if err := options.Params.Validate(); err != nil { + return options, err + } else if options.Token == "" { + return options, errors.New("missing token") + } + + return options, nil +} + +// Attach the options to the given request as a query string. Existing query +// parameters are retained unless they are overwritten by the key of one of the +// options. +func (o Options) Attach(req *http.Request) error { + q, err := query.Values(o) + if err != nil { + return err + } + + qc := req.URL.Query() + for k := range q { + qc.Set(k, q.Get(k)) + } + req.URL.RawQuery = qc.Encode() + + return nil +} + +// Encode the options into a url query string. +func (o Options) Encode() (string, error) { + q, err := query.Values(o) + if err != nil { + return "", err + } + return q.Encode(), nil +} diff --git a/axiom/sas/options_test.go b/axiom/sas/options_test.go new file mode 100644 index 00000000..3ad76874 --- /dev/null +++ b/axiom/sas/options_test.go @@ -0,0 +1,72 @@ +package sas + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOptions_Decode(t *testing.T) { + options, err := Decode("dt=logs&exp=now&fl=customer+%3D%3D+%22vercel%22&met=now&mst=ago%281h%29&oi=axiom&tk=zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0%3D") + require.NoError(t, err) + require.NotEmpty(t, options) + + assert.Equal(t, Options{ + Params: Params{ + OrganizationID: "axiom", + Dataset: "logs", + Filter: `customer == "vercel"`, + MinStartTime: "ago(1h)", + MaxEndTime: "now", + ExpiryTime: "now", + }, + Token: "zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0=", + }, options) +} + +func TestOptions_Attach(t *testing.T) { + options := Options{ + Params: Params{ + OrganizationID: "axiom", + Dataset: "logs", + Filter: `customer == "vercel"`, + MinStartTime: "ago(1h)", + MaxEndTime: "now", + ExpiryTime: "now", + }, + Token: "zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0=", + } + + req := httptest.NewRequest(http.MethodPost, "/v1/datasets/_apl", nil) + + err := options.Attach(req) + require.NoError(t, err) + + parsedOptions, err := Decode(req.URL.RawQuery) + require.NoError(t, err) + + assert.Equal(t, options, parsedOptions) +} + +func TestOptions_Encode(t *testing.T) { + options := Options{ + Params: Params{ + OrganizationID: "axiom", + Dataset: "logs", + Filter: `customer == "vercel"`, + MinStartTime: "ago(1h)", + MaxEndTime: "now", + ExpiryTime: "now", + }, + Token: "zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0=", + } + + s, err := options.Encode() + require.NoError(t, err) + require.NotEmpty(t, s) + + assert.Equal(t, "dt=logs&exp=now&fl=customer+%3D%3D+%22vercel%22&met=now&mst=ago%281h%29&oi=axiom&tk=zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0%3D", s) +} diff --git a/axiom/sas/params.go b/axiom/sas/params.go new file mode 100644 index 00000000..05863763 --- /dev/null +++ b/axiom/sas/params.go @@ -0,0 +1,96 @@ +package sas + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + "hash" +) + +var ( + keyCodec = base64.StdEncoding + tokenCodec = base64.RawURLEncoding +) + +// Params represents the parameters for creating a shared access token or a +// shared access signatur for a query request. +type Params struct { + // OrganizationID is the ID of the organization the token and signature is + // valid for. + OrganizationID string `url:"oi"` + // Dataset name the token and signature is valid for. + Dataset string `url:"dt"` + // Filter is the top-level query filter to apply to all query requests the + // token and signature is valid for. Must be a valid APL filter expression. + Filter string `url:"fl"` + // MinStartTime is the earliest query start time the token and signature is + // valid for. Can be a timestamp or an APL expression. + MinStartTime string `url:"mst"` + // MaxEndTime is the latest query end time the token and signature is valid + // for. Can be a timestamp or an APL expression. + MaxEndTime string `url:"met"` + // ExpiryTime is the time after the token and signature are not valid + // anymore. Can be a timestamp or an APL expression. + ExpiryTime string `url:"exp"` +} + +// Validate makes sure that all query parameters are provided. +func (p Params) Validate() error { + if p.OrganizationID == "" { + return errors.New("organization ID is required") + } else if p.Dataset == "" { + return errors.New("dataset is required") + } else if p.Filter == "" { + return errors.New("filter is required") + } else if p.MinStartTime == "" { + return errors.New("minimum start time is required") + } else if p.MaxEndTime == "" { + return errors.New("maximum end time is required") + } else if p.ExpiryTime == "" { + return errors.New("expiry time is required") + } + return nil +} + +// sign the parameters with the given key and return a base64 encoded token. +func (p Params) sign(key string) (string, error) { + k, err := keyCodec.DecodeString(key) + if err != nil { + return "", fmt.Errorf("invalid key: %s", err) + } + + h := hmac.New(sha256.New, k) + if err = buildSignature(h, p); err != nil { + return "", fmt.Errorf("computing hmac: %s", err) + } + + return tokenCodec.EncodeToString(h.Sum(nil)), nil +} + +// buildSignature builds the payload for a shared access token and writes it to +// the given [hash.Hash] in order to compute the signature. The format for the +// payload is a simple, newline decoded string composed of the following values +// in that order: organization ID, dataset name, filter, minimum start time, +// maximum end time, expiry time. +func buildSignature(h hash.Hash, params Params) error { + paramList := []string{ + params.OrganizationID, + params.Dataset, + params.Filter, + params.MinStartTime, + params.MaxEndTime, + params.ExpiryTime, + } + for idx, param := range paramList { + if _, err := h.Write([]byte(param)); err != nil { + return err + } else if idx < len(paramList)-1 { // Skip newline for last param. + if _, err = h.Write([]byte{'\n'}); err != nil { + return err + } + } + } + return nil +} diff --git a/axiom/sas/params_test.go b/axiom/sas/params_test.go new file mode 100644 index 00000000..4f5f45fa --- /dev/null +++ b/axiom/sas/params_test.go @@ -0,0 +1,62 @@ +package sas + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParams_Validate(t *testing.T) { + var params Params + + err := params.Validate() + assert.EqualError(t, err, "organization ID is required") + + params.OrganizationID = "axiom" + + err = params.Validate() + assert.EqualError(t, err, "dataset is required") + + params.Dataset = "logs" + + err = params.Validate() + assert.EqualError(t, err, "filter is required") + + params.Filter = `customer == "vercel"` + + err = params.Validate() + assert.EqualError(t, err, "minimum start time is required") + + params.MinStartTime = "ago(1h)" + + err = params.Validate() + assert.EqualError(t, err, "maximum end time is required") + + params.MaxEndTime = "now" + + err = params.Validate() + assert.EqualError(t, err, "expiry time is required") + + params.ExpiryTime = "now" + + err = params.Validate() + assert.NoError(t, err) +} + +func TestParams_sign(t *testing.T) { + params := Params{ + OrganizationID: "axiom", + Dataset: "logs", + Filter: `customer == "vercel"`, + MinStartTime: "ago(1h)", + MaxEndTime: "now", + ExpiryTime: "now", + } + + token, err := params.sign(testKeyStr) + require.NoError(t, err) + require.NotEmpty(t, token) + + assert.Equal(t, "zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0", token) +} diff --git a/axiom/sas/sas.go b/axiom/sas/sas.go new file mode 100644 index 00000000..044c002a --- /dev/null +++ b/axiom/sas/sas.go @@ -0,0 +1,23 @@ +package sas + +import "fmt" + +// Create creates the options that compose a shared access signature signed with +// the given key and valid for the given parameters. The options can be encoded +// into a query string by calling [Options.Encode] and attached to a URL by +// calling [Options.Attach]. +func Create(key string, params Params) (Options, error) { + if err := params.Validate(); err != nil { + return Options{}, fmt.Errorf("invalid parameters: %s", err) + } + + token, err := params.sign(key) + if err != nil { + return Options{}, err + } + + return Options{ + Params: params, + Token: token, + }, nil +} diff --git a/axiom/sas/sas_integration_test.go b/axiom/sas/sas_integration_test.go new file mode 100644 index 00000000..91aa5b17 --- /dev/null +++ b/axiom/sas/sas_integration_test.go @@ -0,0 +1,194 @@ +//go:build integration + +package sas_test + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/axiomhq/axiom-go/axiom" + "github.com/axiomhq/axiom-go/axiom/ingest" + "github.com/axiomhq/axiom-go/axiom/query" + "github.com/axiomhq/axiom-go/axiom/sas" + "github.com/axiomhq/axiom-go/internal/config" + "github.com/axiomhq/axiom-go/internal/test/testhelper" +) + +const ingestData = `[ + { + "time": "17/May/2015:08:05:30 +0000", + "remote_ip": "93.180.71.1", + "remote_user": "-", + "request": "GET /downloads/product_1 HTTP/1.1", + "response": 304, + "bytes": 0, + "referrer": "-", + "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)" + }, + { + "time": "17/May/2015:08:05:31 +0000", + "remote_ip": "93.180.71.2", + "remote_user": "-", + "request": "GET /downloads/product_1 HTTP/1.1", + "response": 304, + "bytes": 0, + "referrer": "-", + "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)" + } +]` + +func TestSAS(t *testing.T) { + cfg := config.Default() + if err := cfg.IncorporateEnvironment(); err != nil { + t.Fatal(err) + } else if err = cfg.Validate(); err != nil { + t.Fatal(err) + } + + datasetSuffix := os.Getenv("AXIOM_DATASET_SUFFIX") + if datasetSuffix == "" { + datasetSuffix = "local" + } + + signingKey := os.Getenv("AXIOM_SIGNING_KEY") + if signingKey == "" { + t.Fatal("AXIOM_SIGNING_KEY must be set to a shared access signing key!") + } + + // Clear the environment to avoid unexpected behavior. + testhelper.SafeClearEnv(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + t.Cleanup(cancel) + + userAgent := fmt.Sprintf("axiom-go-sas-integration-test/%s", datasetSuffix) + client, err := axiom.NewClient( + axiom.SetNoEnv(), + axiom.SetURL(cfg.BaseURL().String()), + axiom.SetToken(cfg.Token()), + axiom.SetOrganizationID(cfg.OrganizationID()), + axiom.SetUserAgent(userAgent), + ) + require.NoError(t, err) + + // Get some info on the user that runs the test. + testUser, err := client.Users.Current(ctx) + require.NoError(t, err) + + t.Logf("using account %q", testUser.Name) + + // Create the dataset to use. + dataset, err := client.Datasets.Create(ctx, axiom.DatasetCreateRequest{ + Name: fmt.Sprintf("test-axiom-go-sas-%s", datasetSuffix), + Description: "This is a test dataset for adapter integration tests.", + }) + require.NoError(t, err) + t.Cleanup(func() { + // Restore token. + optsErr := client.Options(axiom.SetToken(cfg.Token())) + require.NoError(t, optsErr) + + teardownCtx := teardownContext(t, time.Second*15) + deleteErr := client.Datasets.Delete(teardownCtx, dataset.ID) + assert.NoError(t, deleteErr) + }) + + // Ingest some test data. + ingestRes, err := client.Ingest(ctx, dataset.ID, strings.NewReader(ingestData), axiom.JSON, axiom.Identity) + require.NoError(t, err) + require.EqualValues(t, 2, ingestRes.Ingested) + + // Ingest one event each with an earlier timestamp that will break the query + // test if the signatures time range is not respected. + now := time.Now() + then := now.Add(-time.Hour) + ingestRes, err = client.IngestEvents(ctx, dataset.ID, []axiom.Event{ + { + ingest.TimestampField: then.Format(time.RFC3339Nano), + "remote_ip": "93.180.71.1", + }, + { + ingest.TimestampField: then.Format(time.RFC3339Nano), + "remote_ip": "93.180.71.2", + }, + }) + require.NoError(t, err) + require.EqualValues(t, 2, ingestRes.Ingested) + + options, err := sas.Create(signingKey, sas.Params{ + OrganizationID: cfg.OrganizationID(), + Dataset: dataset.ID, + Filter: `remote_ip == "93.180.71.1"`, + MinStartTime: "ago(5m)", + MaxEndTime: "now", + ExpiryTime: "endofday(now)", + }) + require.NoError(t, err) + require.NotEmpty(t, options) + + u := cfg.BaseURL().JoinPath("/v1/datasets/_apl") + q := u.Query() + q.Set("format", "legacy") + u.RawQuery = q.Encode() + + r := fmt.Sprintf(`{ + "apl": "['%s'] | count", + "startTime": "ago(1m)", + "endTime": "now" + }`, dataset.ID) + req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(r)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + + require.NoError(t, options.Attach(req)) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, resp.Body.Close()) }) + + require.Equal(t, http.StatusOK, resp.StatusCode, "unexpected status code") + + var res query.Result + require.NoError(t, json.NewDecoder(resp.Body).Decode(&res)) + + assert.EqualValues(t, 1, res.Buckets.Totals[0].Aggregations[0].Value) + + // Now try to query and bypass the timerange via an APL 'where' statement. + r = fmt.Sprintf(`{ + "apl": "['%s'] | where _time > ago(1d) | count", + "startTime": "ago(1m)", + "endTime": "now" + }`, dataset.ID) + req, err = http.NewRequest(http.MethodPost, u.String(), strings.NewReader(r)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + + require.NoError(t, options.Attach(req)) + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, resp.Body.Close()) }) + + require.Equal(t, http.StatusOK, resp.StatusCode, "unexpected status code") + + require.NoError(t, json.NewDecoder(resp.Body).Decode(&res)) + + assert.EqualValues(t, 1, res.Buckets.Totals[0].Aggregations[0].Value) +} + +func teardownContext(t *testing.T, timeout time.Duration) context.Context { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + return ctx +} diff --git a/axiom/sas/sas_test.go b/axiom/sas/sas_test.go new file mode 100644 index 00000000..a8ddf5cb --- /dev/null +++ b/axiom/sas/sas_test.go @@ -0,0 +1,35 @@ +package sas + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testKeyStr = "aeyGXNKLbqpPhBHqjHnVr4FS+eJ1d3LsheK1M8k6054=" + +func TestCreate(t *testing.T) { + options, err := Create(testKeyStr, Params{ + OrganizationID: "axiom", + Dataset: "logs", + Filter: `customer == "vercel"`, + MinStartTime: "ago(1h)", + MaxEndTime: "now", + ExpiryTime: "now", + }) + require.NoError(t, err) + require.NotEmpty(t, options) + + assert.Equal(t, Options{ + Params: Params{ + OrganizationID: "axiom", + Dataset: "logs", + Filter: `customer == "vercel"`, + MinStartTime: "ago(1h)", + MaxEndTime: "now", + ExpiryTime: "now", + }, + Token: "zdLDQdmMUIz1glTnQUCVJpYdZSIRLIPAj-c-y8zqph0", + }, options) +} diff --git a/doc.go b/doc.go index ebae12a2..2d8007dc 100644 --- a/doc.go +++ b/doc.go @@ -7,6 +7,7 @@ // import "github.com/axiomhq/axiom-go/axiom/otel" // When using OpenTelemetry // import "github.com/axiomhq/axiom-go/axiom/query" // When constructing APL queries // import "github.com/axiomhq/axiom-go/axiom/querylegacy" // When constructing legacy queries +// import "github.com/axiomhq/axiom-go/axiom/sas" // When using shared access // // Construct a new Axiom client, then use the various services on the client to // access different parts of the Axiom API. The package automatically takes its diff --git a/examples/sas/main.go b/examples/sas/main.go new file mode 100644 index 00000000..7a23b9b5 --- /dev/null +++ b/examples/sas/main.go @@ -0,0 +1,121 @@ +// The purpose of this example is to show how to create a shared access +// signature (SAS) for a dataset and use it to query that dataset via an +// ordinary HTTP request. +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "net/url" + "os" + "strings" + + "github.com/axiomhq/axiom-go/axiom" + "github.com/axiomhq/axiom-go/axiom/query" + "github.com/axiomhq/axiom-go/axiom/sas" +) + +func main() { + // Export "AXIOM_DATASET" in addition to the required environment variables. + + dataset := os.Getenv("AXIOM_DATASET") + if dataset == "" { + log.Fatal("AXIOM_DATASET is required") + } + + signingKey := os.Getenv("AXIOM_SIGNING_KEY") + if dataset == "" { + log.Fatal("AXIOM_SIGNING_KEY is required") + } + + // 1. Initialize the Axiom API client. + client, err := axiom.NewClient() + if err != nil { + log.Fatal(err) + } + + // 2. Ingest some events with different values for the "team_id" field. + events := []axiom.Event{ + {"team_id": "a", "value": 1}, + {"team_id": "a", "value": 2}, + {"team_id": "b", "value": 4}, + {"team_id": "b", "value": 5}, + } + ingestRes, err := client.IngestEvents(context.Background(), dataset, events) + if err != nil { + log.Fatal(err) + } else if fails := len(ingestRes.Failures); fails > 0 { + log.Fatalf("Ingestion of %d events failed", fails) + } + + // 3. Create a shared access signature that limits query access to events + // by the "team_id" field to only those with the value "a". The queries time + // range is limited to the last 5 minutes. + options, err := sas.Create(signingKey, sas.Params{ + OrganizationID: os.Getenv("AXIOM_ORG_ID"), + Dataset: dataset, + Filter: `team_id == "a"`, + MinStartTime: "ago(5m)", + MaxEndTime: "now", + ExpiryTime: "now", + }) + if err != nil { + log.Fatal(err) + } + + // ❗From here on, assume the code is executed by a non-Axiom user that is + // delegated query access via the SAS handed to him on behalf of the + // organization. + + // 4. Construct the Axiom API URL for the APL query endpoint. + u := os.Getenv("AXIOM_URL") + if u == "" { + u = "https://api.axiom.co" + } + queryURL, err := url.JoinPath(u, "/v1/datasets/_apl") + if err != nil { + log.Fatal(err) + } + queryURL += "?format=legacy" // Currently, must be set to "legacy". + + // 5. Construct the APL query request. + r := fmt.Sprintf(`{ + "apl": "['%s'] | count", + "startTime": "ago(1m)", + "endTime": "now" + }`, dataset) + req, err := http.NewRequest(http.MethodPost, queryURL, strings.NewReader(r)) + if err != nil { + log.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + + // 6. Attach the SAS to the request. + if err = options.Attach(req); err != nil { + log.Fatal(err) + } + + // 7. Execute the request. + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + // 8. Check the response status code. + if code := resp.StatusCode; code != http.StatusOK { + log.Fatalf("unexpected status code: %d (%s)", code, http.StatusText(code)) + } + + // 9. Decode the response. + var res query.Result + if err = json.NewDecoder(resp.Body).Decode(&res); err != nil { + log.Fatal(err) + } + + // 10. Print the count, which should be "3". + fmt.Println(res.Buckets.Totals[0].Aggregations[0].Value) +} diff --git a/internal/config/config.go b/internal/config/config.go index 86e4835e..f6a76dd0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,7 +8,7 @@ import ( // Config is the configuration for Axiom related functionality. It should never // be created manually but always via the [Default] function. type Config struct { - // baseURL of the Axiom instance. Defaults to [CloudURL]. + // baseURL of the Axiom instance. Defaults to [APIURL]. baseURL *url.URL // token is the authentication token that will be set as 'Bearer' on the // 'Authorization' header. It must be an api or a personal token. diff --git a/internal/config/token.go b/internal/config/token.go index e94c16be..9650f8e6 100644 --- a/internal/config/token.go +++ b/internal/config/token.go @@ -2,17 +2,20 @@ package config import "strings" -// IsAPIToken returns true if the given token is an API token. +// IsAPIToken returns true if the given token is an API token. It does not +// validate the token itself. func IsAPIToken(token string) bool { return strings.HasPrefix(token, "xaat-") } -// IsPersonalToken returns true if the given token is a personal token. +// IsPersonalToken returns true if the given token is a personal token. It does +// not validate the token itself. func IsPersonalToken(token string) bool { return strings.HasPrefix(token, "xapt-") } -// IsValidToken returns true if the given token is a valid Axiom token. -func IsValidToken(token string) bool { - return IsAPIToken(token) || IsPersonalToken(token) +// IsValidToken returns true if the given credential is a valid Axiom +// token. It does not validate the token itself. +func IsValidToken(credential string) bool { + return IsAPIToken(credential) || IsPersonalToken(credential) } diff --git a/internal/test/testdata/testdata.go b/internal/test/testdata/testdata.go index c64cee00..10cdaa55 100644 --- a/internal/test/testdata/testdata.go +++ b/internal/test/testdata/testdata.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/klauspost/compress/gzip" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -17,9 +18,7 @@ var testdata []byte func Load(tb testing.TB) []byte { gzr, err := gzip.NewReader(bytes.NewReader(testdata)) require.NoError(tb, err) - defer func() { - require.NoError(tb, gzr.Close()) - }() + defer func() { assert.NoError(tb, gzr.Close()) }() b, err := io.ReadAll(gzr) require.NoError(tb, err)