Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add QueryConversionHandler #1071

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions backend/data_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,37 @@ import (
"fmt"

"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// dataSDKAdapter adapter between low level plugin protocol and SDK interfaces.
type dataSDKAdapter struct {
queryDataHandler QueryDataHandler
queryDataHandler QueryDataHandler
queryConversionHandler QueryConversionHandler
}

func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter {
func newDataSDKAdapter(handler QueryDataHandler, queryConversionHandler QueryConversionHandler) *dataSDKAdapter {
return &dataSDKAdapter{
queryDataHandler: handler,
queryDataHandler: handler,
queryConversionHandler: queryConversionHandler,
}
}

func (a *dataSDKAdapter) ConvertQueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataRequest, error) {
convertRequest := &QueryConversionRequest{
PluginContext: req.PluginContext,
Queries: req.Queries,
}
convertResponse, err := a.queryConversionHandler.ConvertQuery(ctx, convertRequest)
if err != nil {
return nil, err
}
req.Queries = convertResponse.Queries

return req, nil
}

func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) {
ctx = setupContext(ctx, EndpointQueryData)
parsedReq := FromProto().QueryDataRequest(req)
Expand All @@ -27,6 +45,19 @@ func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataR
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders())
var innerErr error
if a.queryConversionHandler != nil && GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled("dsQueryConvert") {
convertedQuery, innerErr := a.ConvertQueryData(ctx, parsedReq)
Copy link
Contributor

@marefr marefr Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the example, https://github.com/grafana/grafana-plugin-examples/pull/340/files#diff-e608edd147405086920bcb097aae449ffb1ca927fea2ab583e93e9bde6b6ee93, this looks a bit costly resource wise to always call it if implemented. You would have to parse the JSON to figure out if it should be converted/migrated. Then after handling the query data in plugin you would have to parse the JSON again. Not optimal, but maybe not a big deal?

But from plugin development perspective I'm not sure how this helps me compared to having some conversion/migration happening within QueryData that you could do already today without these changes.

It feels like this could be useful if the SDK would handle everything related to figure out if conversion is needed using API version, plugin version or some version/incremented value. Maybe I've been out of the loop lately and missed where we're headed with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the goal here was to avoid people having to manually call the migrate function but that comes with the cost of the extra marshalling-unmarshalling. I am going to refactor it so people will need to call it manually.

if innerErr != nil {
if status.Code(innerErr) == codes.Unimplemented {
// The plugin does not implement query migration, disabling it
a.queryConversionHandler = nil
} else {
return RequestStatusError, innerErr
}
} else {
parsedReq = convertedQuery
}
}
resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq)

status := RequestStatusFromQueryDataResponse(resp, innerErr)
Expand Down
36 changes: 32 additions & 4 deletions backend/data_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/metadata"

"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana-plugin-sdk-go/internal/tenant"
)
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestQueryData(t *testing.T) {
t.Run("When forward HTTP headers enabled should forward headers", func(t *testing.T) {
ctx := context.Background()
handler := newFakeDataHandlerWithOAuth()
adapter := newDataSDKAdapter(handler)
adapter := newDataSDKAdapter(handler, nil)
_, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{
Headers: map[string]string{
"Authorization": "Bearer 123",
Expand All @@ -95,7 +96,7 @@ func TestQueryData(t *testing.T) {
t.Run("When forward HTTP headers disable should not forward headers", func(t *testing.T) {
ctx := context.Background()
handler := newFakeDataHandlerWithOAuth()
adapter := newDataSDKAdapter(handler)
adapter := newDataSDKAdapter(handler, nil)
_, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{
Headers: map[string]string{
"Authorization": "Bearer 123",
Expand All @@ -122,7 +123,7 @@ func TestQueryData(t *testing.T) {
a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) {
require.Equal(t, tid, tenant.IDFromContext(ctx))
return NewQueryDataResponse(), nil
}))
}), nil)

ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
tenant.CtxKey: tid,
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestQueryData(t *testing.T) {
a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) {
actualCtx = ctx
return tc.queryDataResponse, nil
}))
}), nil)
_, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
PluginContext: &pluginv2.PluginContext{},
})
Expand All @@ -229,6 +230,33 @@ func TestQueryData(t *testing.T) {
})
}
})

t.Run("When conversionHandler is defined", func(t *testing.T) {
oldQuery := &pluginv2.DataQuery{
TimeRange: &pluginv2.TimeRange{},
Json: []byte(`{"old":"value"}`),
}
a := newDataSDKAdapter(QueryDataHandlerFunc(func(_ context.Context, q *QueryDataRequest) (*QueryDataResponse, error) {
require.Len(t, q.Queries, 1)
// Assert that the query has been converted
require.Equal(t, string(`{"new":"value"}`), string(q.Queries[0].JSON))
return &QueryDataResponse{}, nil
}), ConvertQueryFunc(func(_ context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) {
require.Len(t, req.Queries, 1)
req.Queries[0].JSON = []byte(`{"new":"value"}`)
return &QueryConversionResponse{Queries: req.Queries}, nil
}))
_, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
PluginContext: &pluginv2.PluginContext{
// Enable feature flag
GrafanaConfig: map[string]string{
featuretoggles.EnabledFeatures: "dsQueryConvert",
},
},
Queries: []*pluginv2.DataQuery{oldQuery},
})
require.NoError(t, err)
})
}

var finalRoundTripper = httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
Expand Down
14 changes: 8 additions & 6 deletions backend/datasource/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt
}
handler := automanagement.NewManager(NewInstanceManager(instanceFactory))
return backend.Manage(pluginID, backend.ServeOpts{
CheckHealthHandler: handler,
CallResourceHandler: handler,
QueryDataHandler: handler,
StreamHandler: handler,
AdmissionHandler: opts.AdmissionHandler,
GRPCSettings: opts.GRPCSettings,
CheckHealthHandler: handler,
CallResourceHandler: handler,
QueryDataHandler: handler,
StreamHandler: handler,
QueryConversionHandler: handler,
AdmissionHandler: opts.AdmissionHandler,
GRPCSettings: opts.GRPCSettings,
ConversionHandler: opts.ConversionHandler,
})
}
2 changes: 1 addition & 1 deletion backend/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestContextualLogger(t *testing.T) {
checkCtxLogger(ctx, t, map[string]any{"endpoint": "queryData", "pluginID": pluginID})
run <- struct{}{}
return NewQueryDataResponse(), nil
}))
}), nil)
_, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{
PluginContext: pCtx,
})
Expand Down
32 changes: 32 additions & 0 deletions backend/query_conversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package backend

import (
"context"
)

// QueryConversionHandler is an EXPERIMENTAL service that allows converting queries between versions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this should it live in an experimental package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be but I am just following the same approach than the other new handlers.


type QueryConversionHandler interface {
// ConvertQuery is called to covert queries between different versions
ConvertQuery(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error)
}

type ConvertQueryFunc func(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error)

// ConvertQuery calls fn(ctx, req).
func (fn ConvertQueryFunc) ConvertQuery(ctx context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) {
return fn(ctx, req)
}

// QueryConversionRequest supports converting a query from on version to another
type QueryConversionRequest struct {
// NOTE: this may not include app or datasource instance settings depending on the request
marefr marked this conversation as resolved.
Show resolved Hide resolved
PluginContext PluginContext `json:"pluginContext,omitempty"`
// Queries to convert. This contains the full metadata envelope.
Queries []DataQuery `json:"objects,omitempty"`
}

type QueryConversionResponse struct {
// Converted queries.
Queries []DataQuery `json:"objects,omitempty"`
}
6 changes: 5 additions & 1 deletion backend/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type ServeOpts struct {
// This is EXPERIMENTAL and is a subject to change till Grafana 12
ConversionHandler ConversionHandler

// QueryConversionHandler converts queries between versions
// This is EXPERIMENTAL and is a subject to change till Grafana 12
QueryConversionHandler QueryConversionHandler

// GRPCSettings settings for gPRC.
GRPCSettings GRPCSettings
}
Expand All @@ -79,7 +83,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts {
}

if opts.QueryDataHandler != nil {
pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler)
pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler, opts.QueryConversionHandler)
}

if opts.StreamHandler != nil {
Expand Down
11 changes: 11 additions & 0 deletions internal/automanagement/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,14 @@ func (m *Manager) RunStream(ctx context.Context, req *backend.RunStreamRequest,
}
return status.Error(codes.Unimplemented, "unimplemented")
}

func (m *Manager) ConvertQuery(ctx context.Context, req *backend.QueryConversionRequest) (*backend.QueryConversionResponse, error) {
h, err := m.Get(ctx, req.PluginContext)
if err != nil {
return nil, err
}
if ds, ok := h.(backend.QueryConversionHandler); ok {
return ds.ConvertQuery(ctx, req)
}
return nil, status.Error(codes.Unimplemented, "unimplemented")
}