Skip to content

Commit

Permalink
Add QueryConversionHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
andresmgot committed Sep 4, 2024
1 parent b6bc9e2 commit ad1d39b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 16 deletions.
28 changes: 25 additions & 3 deletions backend/data_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,31 @@ import (

// dataSDKAdapter adapter between low level plugin protocol and SDK interfaces.
type dataSDKAdapter struct {
queryDataHandler QueryDataHandler
queryDataHandler QueryDataHandler
queryConversionHandler QueryConversionHandler // Optional
}

func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter {
func newDataSDKAdapter(handler QueryDataHandler, queryConversionHandler QueryConversionHandler) *dataSDKAdapter {
return &dataSDKAdapter{
queryDataHandler: handler,
queryDataHandler: handler,
queryConversionHandler: queryConversionHandler,
}
}

func (a *dataSDKAdapter) ConvertQueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataRequest, error) {
convertRequest := &QueryConversionRequest{
PluginContext: req.PluginContext,
Queries: req.Queries,
}
convertResponse, err := a.queryConversionHandler.ConvertQuery(ctx, convertRequest)
if err != nil {
return nil, err
}
req.Queries = convertResponse.Queries

return req, nil
}

func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) {
ctx = setupContext(ctx, EndpointQueryData)
parsedReq := FromProto().QueryDataRequest(req)
Expand All @@ -27,6 +43,12 @@ func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataR
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders())
var innerErr error
if a.queryConversionHandler != nil && GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled("dsQueryConvert") {
parsedReq, innerErr = a.ConvertQueryData(ctx, parsedReq)
if innerErr != nil {
return RequestStatusError, innerErr
}
}
resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq)

status := RequestStatusFromQueryDataResponse(resp, innerErr)
Expand Down
30 changes: 26 additions & 4 deletions backend/data_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestQueryData(t *testing.T) {
t.Run("When forward HTTP headers enabled should forward headers", func(t *testing.T) {
ctx := context.Background()
handler := newFakeDataHandlerWithOAuth()
adapter := newDataSDKAdapter(handler)
adapter := newDataSDKAdapter(handler, nil)
_, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{
Headers: map[string]string{
"Authorization": "Bearer 123",
Expand All @@ -95,7 +95,7 @@ func TestQueryData(t *testing.T) {
t.Run("When forward HTTP headers disable should not forward headers", func(t *testing.T) {
ctx := context.Background()
handler := newFakeDataHandlerWithOAuth()
adapter := newDataSDKAdapter(handler)
adapter := newDataSDKAdapter(handler, nil)
_, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{
Headers: map[string]string{
"Authorization": "Bearer 123",
Expand All @@ -122,7 +122,7 @@ func TestQueryData(t *testing.T) {
a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) {
require.Equal(t, tid, tenant.IDFromContext(ctx))
return NewQueryDataResponse(), nil
}))
}), nil)

ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
tenant.CtxKey: tid,
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestQueryData(t *testing.T) {
a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) {
actualCtx = ctx
return tc.queryDataResponse, nil
}))
}), nil)
_, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
PluginContext: &pluginv2.PluginContext{},
})
Expand All @@ -229,6 +229,28 @@ func TestQueryData(t *testing.T) {
})
}
})

t.Run("When conversionHandler is defined", func(t *testing.T) {
oldQuery := &pluginv2.DataQuery{
TimeRange: &pluginv2.TimeRange{},
Json: []byte(`{"old":"value"}`),
}
a := newDataSDKAdapter(QueryDataHandlerFunc(func(_ context.Context, q *QueryDataRequest) (*QueryDataResponse, error) {
require.Len(t, q.Queries, 1)
// Assert that the query has been converted
require.Equal(t, string(`{"new":"value"}`), string(q.Queries[0].JSON))
return &QueryDataResponse{}, nil
}), ConvertQueryFunc(func(_ context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) {
require.Len(t, req.Queries, 1)
req.Queries[0].JSON = []byte(`{"new":"value"}`)
return &QueryConversionResponse{Queries: req.Queries}, nil
}))
_, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
PluginContext: &pluginv2.PluginContext{},
Queries: []*pluginv2.DataQuery{oldQuery},
})
require.NoError(t, err)
})
}

var finalRoundTripper = httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
Expand Down
17 changes: 11 additions & 6 deletions backend/datasource/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type ManageOpts struct {

// Stateless conversion handler
ConversionHandler backend.ConversionHandler

// Stateless query conversion handler
QueryConversionHandler backend.QueryConversionHandler
}

// Manage starts serving the data source over gPRC with automatic instance management.
Expand All @@ -45,11 +48,13 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt
}
handler := automanagement.NewManager(NewInstanceManager(instanceFactory))
return backend.Manage(pluginID, backend.ServeOpts{
CheckHealthHandler: handler,
CallResourceHandler: handler,
QueryDataHandler: handler,
StreamHandler: handler,
AdmissionHandler: opts.AdmissionHandler,
GRPCSettings: opts.GRPCSettings,
CheckHealthHandler: handler,
CallResourceHandler: handler,
QueryDataHandler: handler,
StreamHandler: handler,
AdmissionHandler: opts.AdmissionHandler,
GRPCSettings: opts.GRPCSettings,
ConversionHandler: opts.ConversionHandler,
QueryConversionHandler: opts.QueryConversionHandler,
})
}
2 changes: 1 addition & 1 deletion backend/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestContextualLogger(t *testing.T) {
checkCtxLogger(ctx, t, map[string]any{"endpoint": "queryData", "pluginID": pluginID})
run <- struct{}{}
return NewQueryDataResponse(), nil
}))
}), nil)
_, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
PluginContext: pCtx,
})
Expand Down
32 changes: 32 additions & 0 deletions backend/query_conversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package backend

import (
"context"
)

// QueryConversionHandler is an EXPERIMENTAL service that allows converting queries between versions

type QueryConversionHandler interface {
// ConvertQuery is called to covert queries between different versions
ConvertQuery(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error)
}

type ConvertQueryFunc func(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error)

// ConvertObjects calls fn(ctx, req).
func (fn ConvertQueryFunc) ConvertQuery(ctx context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) {
return fn(ctx, req)
}

// ConversionRequest supports converting an object from on version to another
type QueryConversionRequest struct {
// NOTE: this may not include app or datasource instance settings depending on the request
PluginContext PluginContext `json:"pluginContext,omitempty"`
// Queries to convert. This contains the full metadata envelope.
Queries []DataQuery `json:"objects,omitempty"`
}

type QueryConversionResponse struct {
// Converted queries.
Queries []DataQuery `json:"objects,omitempty"`
}
8 changes: 6 additions & 2 deletions backend/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type ServeOpts struct {
// This is EXPERIMENTAL and is a subject to change till Grafana 12
ConversionHandler ConversionHandler

// QueryConversionHandler converts queries between versions
// This is EXPERIMENTAL and is a subject to change till Grafana 12
QueryConversionHandler QueryConversionHandler

// GRPCSettings settings for gPRC.
GRPCSettings GRPCSettings
}
Expand All @@ -79,7 +83,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts {
}

if opts.QueryDataHandler != nil {
pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler)
pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler, opts.QueryConversionHandler)
}

if opts.StreamHandler != nil {
Expand All @@ -90,7 +94,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts {
pluginOpts.AdmissionServer = newAdmissionSDKAdapter(opts.AdmissionHandler)
}

if opts.ConversionHandler != nil {
if opts.ConversionHandler != nil || opts.QueryConversionHandler != nil {
pluginOpts.ConversionServer = newConversionSDKAdapter(opts.ConversionHandler)
}
return pluginOpts
Expand Down

0 comments on commit ad1d39b

Please sign in to comment.