diff --git a/backend/data_adapter.go b/backend/data_adapter.go index 5c2d63508..a6a8bb0a3 100644 --- a/backend/data_adapter.go +++ b/backend/data_adapter.go @@ -6,19 +6,37 @@ import ( "fmt" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // dataSDKAdapter adapter between low level plugin protocol and SDK interfaces. type dataSDKAdapter struct { - queryDataHandler QueryDataHandler + queryDataHandler QueryDataHandler + queryConversionHandler QueryConversionHandler } -func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter { +func newDataSDKAdapter(handler QueryDataHandler, queryConversionHandler QueryConversionHandler) *dataSDKAdapter { return &dataSDKAdapter{ - queryDataHandler: handler, + queryDataHandler: handler, + queryConversionHandler: queryConversionHandler, } } +func (a *dataSDKAdapter) ConvertQueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataRequest, error) { + convertRequest := &QueryConversionRequest{ + PluginContext: req.PluginContext, + Queries: req.Queries, + } + convertResponse, err := a.queryConversionHandler.ConvertQuery(ctx, convertRequest) + if err != nil { + return nil, err + } + req.Queries = convertResponse.Queries + + return req, nil +} + func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) { ctx = setupContext(ctx, EndpointQueryData) parsedReq := FromProto().QueryDataRequest(req) @@ -27,6 +45,19 @@ func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataR err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) var innerErr error + if a.queryConversionHandler != nil && GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled("dsQueryConvert") { + convertedQuery, innerErr := a.ConvertQueryData(ctx, parsedReq) + if innerErr != nil { + if status.Code(innerErr) == codes.Unimplemented { + // The plugin does not implement query migration, disabling it + a.queryConversionHandler = nil + } else { + return RequestStatusError, innerErr + } + } else { + parsedReq = convertedQuery + } + } resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq) status := RequestStatusFromQueryDataResponse(resp, innerErr) diff --git a/backend/data_adapter_test.go b/backend/data_adapter_test.go index f20ef284a..f3dfef521 100644 --- a/backend/data_adapter_test.go +++ b/backend/data_adapter_test.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/metadata" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" "github.com/grafana/grafana-plugin-sdk-go/internal/tenant" ) @@ -69,7 +70,7 @@ func TestQueryData(t *testing.T) { t.Run("When forward HTTP headers enabled should forward headers", func(t *testing.T) { ctx := context.Background() handler := newFakeDataHandlerWithOAuth() - adapter := newDataSDKAdapter(handler) + adapter := newDataSDKAdapter(handler, nil) _, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{ Headers: map[string]string{ "Authorization": "Bearer 123", @@ -95,7 +96,7 @@ func TestQueryData(t *testing.T) { t.Run("When forward HTTP headers disable should not forward headers", func(t *testing.T) { ctx := context.Background() handler := newFakeDataHandlerWithOAuth() - adapter := newDataSDKAdapter(handler) + adapter := newDataSDKAdapter(handler, nil) _, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{ Headers: map[string]string{ "Authorization": "Bearer 123", @@ -122,7 +123,7 @@ func TestQueryData(t *testing.T) { a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) { require.Equal(t, tid, tenant.IDFromContext(ctx)) return NewQueryDataResponse(), nil - })) + }), nil) ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{ tenant.CtxKey: tid, @@ -213,7 +214,7 @@ func TestQueryData(t *testing.T) { a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) { actualCtx = ctx return tc.queryDataResponse, nil - })) + }), nil) _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ PluginContext: &pluginv2.PluginContext{}, }) @@ -229,6 +230,33 @@ func TestQueryData(t *testing.T) { }) } }) + + t.Run("When conversionHandler is defined", func(t *testing.T) { + oldQuery := &pluginv2.DataQuery{ + TimeRange: &pluginv2.TimeRange{}, + Json: []byte(`{"old":"value"}`), + } + a := newDataSDKAdapter(QueryDataHandlerFunc(func(_ context.Context, q *QueryDataRequest) (*QueryDataResponse, error) { + require.Len(t, q.Queries, 1) + // Assert that the query has been converted + require.Equal(t, string(`{"new":"value"}`), string(q.Queries[0].JSON)) + return &QueryDataResponse{}, nil + }), ConvertQueryFunc(func(_ context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) { + require.Len(t, req.Queries, 1) + req.Queries[0].JSON = []byte(`{"new":"value"}`) + return &QueryConversionResponse{Queries: req.Queries}, nil + })) + _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ + PluginContext: &pluginv2.PluginContext{ + // Enable feature flag + GrafanaConfig: map[string]string{ + featuretoggles.EnabledFeatures: "dsQueryConvert", + }, + }, + Queries: []*pluginv2.DataQuery{oldQuery}, + }) + require.NoError(t, err) + }) } var finalRoundTripper = httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { diff --git a/backend/datasource/manage.go b/backend/datasource/manage.go index 6b5c15f00..2abf2be18 100644 --- a/backend/datasource/manage.go +++ b/backend/datasource/manage.go @@ -45,11 +45,13 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt } handler := automanagement.NewManager(NewInstanceManager(instanceFactory)) return backend.Manage(pluginID, backend.ServeOpts{ - CheckHealthHandler: handler, - CallResourceHandler: handler, - QueryDataHandler: handler, - StreamHandler: handler, - AdmissionHandler: opts.AdmissionHandler, - GRPCSettings: opts.GRPCSettings, + CheckHealthHandler: handler, + CallResourceHandler: handler, + QueryDataHandler: handler, + StreamHandler: handler, + QueryConversionHandler: handler, + AdmissionHandler: opts.AdmissionHandler, + GRPCSettings: opts.GRPCSettings, + ConversionHandler: opts.ConversionHandler, }) } diff --git a/backend/log_test.go b/backend/log_test.go index 22255c67a..079659997 100644 --- a/backend/log_test.go +++ b/backend/log_test.go @@ -40,7 +40,7 @@ func TestContextualLogger(t *testing.T) { checkCtxLogger(ctx, t, map[string]any{"endpoint": "queryData", "pluginID": pluginID}) run <- struct{}{} return NewQueryDataResponse(), nil - })) + }), nil) _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ PluginContext: pCtx, }) diff --git a/backend/query_conversion.go b/backend/query_conversion.go new file mode 100644 index 000000000..500fc7bfc --- /dev/null +++ b/backend/query_conversion.go @@ -0,0 +1,32 @@ +package backend + +import ( + "context" +) + +// QueryConversionHandler is an EXPERIMENTAL service that allows converting queries between versions + +type QueryConversionHandler interface { + // ConvertQuery is called to covert queries between different versions + ConvertQuery(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error) +} + +type ConvertQueryFunc func(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error) + +// ConvertQuery calls fn(ctx, req). +func (fn ConvertQueryFunc) ConvertQuery(ctx context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) { + return fn(ctx, req) +} + +// QueryConversionRequest supports converting a query from on version to another +type QueryConversionRequest struct { + // NOTE: this may not include app or datasource instance settings depending on the request + PluginContext PluginContext `json:"pluginContext,omitempty"` + // Queries to convert. This contains the full metadata envelope. + Queries []DataQuery `json:"objects,omitempty"` +} + +type QueryConversionResponse struct { + // Converted queries. + Queries []DataQuery `json:"objects,omitempty"` +} diff --git a/backend/serve.go b/backend/serve.go index 65b1814fe..aaa85d4b9 100644 --- a/backend/serve.go +++ b/backend/serve.go @@ -65,6 +65,10 @@ type ServeOpts struct { // This is EXPERIMENTAL and is a subject to change till Grafana 12 ConversionHandler ConversionHandler + // QueryConversionHandler converts queries between versions + // This is EXPERIMENTAL and is a subject to change till Grafana 12 + QueryConversionHandler QueryConversionHandler + // GRPCSettings settings for gPRC. GRPCSettings GRPCSettings } @@ -79,7 +83,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts { } if opts.QueryDataHandler != nil { - pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler) + pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler, opts.QueryConversionHandler) } if opts.StreamHandler != nil { diff --git a/internal/automanagement/manager.go b/internal/automanagement/manager.go index eac8a61c9..526bbfa20 100644 --- a/internal/automanagement/manager.go +++ b/internal/automanagement/manager.go @@ -92,3 +92,14 @@ func (m *Manager) RunStream(ctx context.Context, req *backend.RunStreamRequest, } return status.Error(codes.Unimplemented, "unimplemented") } + +func (m *Manager) ConvertQuery(ctx context.Context, req *backend.QueryConversionRequest) (*backend.QueryConversionResponse, error) { + h, err := m.Get(ctx, req.PluginContext) + if err != nil { + return nil, err + } + if ds, ok := h.(backend.QueryConversionHandler); ok { + return ds.ConvertQuery(ctx, req) + } + return nil, status.Error(codes.Unimplemented, "unimplemented") +}