From ad1d39b9ffa6031c5dd1f92d37826d75ba89f05b Mon Sep 17 00:00:00 2001 From: Andres Martinez Gotor Date: Wed, 4 Sep 2024 17:00:34 +0200 Subject: [PATCH 1/3] Add QueryConversionHandler --- backend/data_adapter.go | 28 +++++++++++++++++++++++++--- backend/data_adapter_test.go | 30 ++++++++++++++++++++++++++---- backend/datasource/manage.go | 17 +++++++++++------ backend/log_test.go | 2 +- backend/query_conversion.go | 32 ++++++++++++++++++++++++++++++++ backend/serve.go | 8 ++++++-- 6 files changed, 101 insertions(+), 16 deletions(-) create mode 100644 backend/query_conversion.go diff --git a/backend/data_adapter.go b/backend/data_adapter.go index 5c2d63508..a34aa4087 100644 --- a/backend/data_adapter.go +++ b/backend/data_adapter.go @@ -10,15 +10,31 @@ import ( // dataSDKAdapter adapter between low level plugin protocol and SDK interfaces. type dataSDKAdapter struct { - queryDataHandler QueryDataHandler + queryDataHandler QueryDataHandler + queryConversionHandler QueryConversionHandler // Optional } -func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter { +func newDataSDKAdapter(handler QueryDataHandler, queryConversionHandler QueryConversionHandler) *dataSDKAdapter { return &dataSDKAdapter{ - queryDataHandler: handler, + queryDataHandler: handler, + queryConversionHandler: queryConversionHandler, } } +func (a *dataSDKAdapter) ConvertQueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataRequest, error) { + convertRequest := &QueryConversionRequest{ + PluginContext: req.PluginContext, + Queries: req.Queries, + } + convertResponse, err := a.queryConversionHandler.ConvertQuery(ctx, convertRequest) + if err != nil { + return nil, err + } + req.Queries = convertResponse.Queries + + return req, nil +} + func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) { ctx = setupContext(ctx, EndpointQueryData) parsedReq := FromProto().QueryDataRequest(req) @@ -27,6 +43,12 @@ func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataR err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) var innerErr error + if a.queryConversionHandler != nil && GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled("dsQueryConvert") { + parsedReq, innerErr = a.ConvertQueryData(ctx, parsedReq) + if innerErr != nil { + return RequestStatusError, innerErr + } + } resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq) status := RequestStatusFromQueryDataResponse(resp, innerErr) diff --git a/backend/data_adapter_test.go b/backend/data_adapter_test.go index f20ef284a..0a5e540ce 100644 --- a/backend/data_adapter_test.go +++ b/backend/data_adapter_test.go @@ -69,7 +69,7 @@ func TestQueryData(t *testing.T) { t.Run("When forward HTTP headers enabled should forward headers", func(t *testing.T) { ctx := context.Background() handler := newFakeDataHandlerWithOAuth() - adapter := newDataSDKAdapter(handler) + adapter := newDataSDKAdapter(handler, nil) _, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{ Headers: map[string]string{ "Authorization": "Bearer 123", @@ -95,7 +95,7 @@ func TestQueryData(t *testing.T) { t.Run("When forward HTTP headers disable should not forward headers", func(t *testing.T) { ctx := context.Background() handler := newFakeDataHandlerWithOAuth() - adapter := newDataSDKAdapter(handler) + adapter := newDataSDKAdapter(handler, nil) _, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{ Headers: map[string]string{ "Authorization": "Bearer 123", @@ -122,7 +122,7 @@ func TestQueryData(t *testing.T) { a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) { require.Equal(t, tid, tenant.IDFromContext(ctx)) return NewQueryDataResponse(), nil - })) + }), nil) ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{ tenant.CtxKey: tid, @@ -213,7 +213,7 @@ func TestQueryData(t *testing.T) { a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) { actualCtx = ctx return tc.queryDataResponse, nil - })) + }), nil) _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ PluginContext: &pluginv2.PluginContext{}, }) @@ -229,6 +229,28 @@ func TestQueryData(t *testing.T) { }) } }) + + t.Run("When conversionHandler is defined", func(t *testing.T) { + oldQuery := &pluginv2.DataQuery{ + TimeRange: &pluginv2.TimeRange{}, + Json: []byte(`{"old":"value"}`), + } + a := newDataSDKAdapter(QueryDataHandlerFunc(func(_ context.Context, q *QueryDataRequest) (*QueryDataResponse, error) { + require.Len(t, q.Queries, 1) + // Assert that the query has been converted + require.Equal(t, string(`{"new":"value"}`), string(q.Queries[0].JSON)) + return &QueryDataResponse{}, nil + }), ConvertQueryFunc(func(_ context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) { + require.Len(t, req.Queries, 1) + req.Queries[0].JSON = []byte(`{"new":"value"}`) + return &QueryConversionResponse{Queries: req.Queries}, nil + })) + _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ + PluginContext: &pluginv2.PluginContext{}, + Queries: []*pluginv2.DataQuery{oldQuery}, + }) + require.NoError(t, err) + }) } var finalRoundTripper = httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { diff --git a/backend/datasource/manage.go b/backend/datasource/manage.go index 6b5c15f00..6fc1464bf 100644 --- a/backend/datasource/manage.go +++ b/backend/datasource/manage.go @@ -24,6 +24,9 @@ type ManageOpts struct { // Stateless conversion handler ConversionHandler backend.ConversionHandler + + // Stateless query conversion handler + QueryConversionHandler backend.QueryConversionHandler } // Manage starts serving the data source over gPRC with automatic instance management. @@ -45,11 +48,13 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt } handler := automanagement.NewManager(NewInstanceManager(instanceFactory)) return backend.Manage(pluginID, backend.ServeOpts{ - CheckHealthHandler: handler, - CallResourceHandler: handler, - QueryDataHandler: handler, - StreamHandler: handler, - AdmissionHandler: opts.AdmissionHandler, - GRPCSettings: opts.GRPCSettings, + CheckHealthHandler: handler, + CallResourceHandler: handler, + QueryDataHandler: handler, + StreamHandler: handler, + AdmissionHandler: opts.AdmissionHandler, + GRPCSettings: opts.GRPCSettings, + ConversionHandler: opts.ConversionHandler, + QueryConversionHandler: opts.QueryConversionHandler, }) } diff --git a/backend/log_test.go b/backend/log_test.go index 22255c67a..079659997 100644 --- a/backend/log_test.go +++ b/backend/log_test.go @@ -40,7 +40,7 @@ func TestContextualLogger(t *testing.T) { checkCtxLogger(ctx, t, map[string]any{"endpoint": "queryData", "pluginID": pluginID}) run <- struct{}{} return NewQueryDataResponse(), nil - })) + }), nil) _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ PluginContext: pCtx, }) diff --git a/backend/query_conversion.go b/backend/query_conversion.go new file mode 100644 index 000000000..98ee48307 --- /dev/null +++ b/backend/query_conversion.go @@ -0,0 +1,32 @@ +package backend + +import ( + "context" +) + +// QueryConversionHandler is an EXPERIMENTAL service that allows converting queries between versions + +type QueryConversionHandler interface { + // ConvertQuery is called to covert queries between different versions + ConvertQuery(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error) +} + +type ConvertQueryFunc func(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error) + +// ConvertObjects calls fn(ctx, req). +func (fn ConvertQueryFunc) ConvertQuery(ctx context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) { + return fn(ctx, req) +} + +// ConversionRequest supports converting an object from on version to another +type QueryConversionRequest struct { + // NOTE: this may not include app or datasource instance settings depending on the request + PluginContext PluginContext `json:"pluginContext,omitempty"` + // Queries to convert. This contains the full metadata envelope. + Queries []DataQuery `json:"objects,omitempty"` +} + +type QueryConversionResponse struct { + // Converted queries. + Queries []DataQuery `json:"objects,omitempty"` +} diff --git a/backend/serve.go b/backend/serve.go index 65b1814fe..74564428c 100644 --- a/backend/serve.go +++ b/backend/serve.go @@ -65,6 +65,10 @@ type ServeOpts struct { // This is EXPERIMENTAL and is a subject to change till Grafana 12 ConversionHandler ConversionHandler + // QueryConversionHandler converts queries between versions + // This is EXPERIMENTAL and is a subject to change till Grafana 12 + QueryConversionHandler QueryConversionHandler + // GRPCSettings settings for gPRC. GRPCSettings GRPCSettings } @@ -79,7 +83,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts { } if opts.QueryDataHandler != nil { - pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler) + pluginOpts.DataServer = newDataSDKAdapter(opts.QueryDataHandler, opts.QueryConversionHandler) } if opts.StreamHandler != nil { @@ -90,7 +94,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts { pluginOpts.AdmissionServer = newAdmissionSDKAdapter(opts.AdmissionHandler) } - if opts.ConversionHandler != nil { + if opts.ConversionHandler != nil || opts.QueryConversionHandler != nil { pluginOpts.ConversionServer = newConversionSDKAdapter(opts.ConversionHandler) } return pluginOpts From 45114933a2cfdeaafd8c03e413a19be18edb8707 Mon Sep 17 00:00:00 2001 From: Andres Martinez Gotor Date: Thu, 5 Sep 2024 11:10:14 +0200 Subject: [PATCH 2/3] auto handling --- backend/data_adapter.go | 15 ++++++++++++--- backend/data_adapter_test.go | 10 ++++++++-- backend/datasource/manage.go | 5 +---- backend/serve.go | 2 +- internal/automanagement/manager.go | 11 +++++++++++ 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/backend/data_adapter.go b/backend/data_adapter.go index a34aa4087..a6a8bb0a3 100644 --- a/backend/data_adapter.go +++ b/backend/data_adapter.go @@ -6,12 +6,14 @@ import ( "fmt" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // dataSDKAdapter adapter between low level plugin protocol and SDK interfaces. type dataSDKAdapter struct { queryDataHandler QueryDataHandler - queryConversionHandler QueryConversionHandler // Optional + queryConversionHandler QueryConversionHandler } func newDataSDKAdapter(handler QueryDataHandler, queryConversionHandler QueryConversionHandler) *dataSDKAdapter { @@ -44,9 +46,16 @@ func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataR ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) var innerErr error if a.queryConversionHandler != nil && GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled("dsQueryConvert") { - parsedReq, innerErr = a.ConvertQueryData(ctx, parsedReq) + convertedQuery, innerErr := a.ConvertQueryData(ctx, parsedReq) if innerErr != nil { - return RequestStatusError, innerErr + if status.Code(innerErr) == codes.Unimplemented { + // The plugin does not implement query migration, disabling it + a.queryConversionHandler = nil + } else { + return RequestStatusError, innerErr + } + } else { + parsedReq = convertedQuery } } resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq) diff --git a/backend/data_adapter_test.go b/backend/data_adapter_test.go index 0a5e540ce..f3dfef521 100644 --- a/backend/data_adapter_test.go +++ b/backend/data_adapter_test.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/metadata" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" "github.com/grafana/grafana-plugin-sdk-go/internal/tenant" ) @@ -246,8 +247,13 @@ func TestQueryData(t *testing.T) { return &QueryConversionResponse{Queries: req.Queries}, nil })) _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ - PluginContext: &pluginv2.PluginContext{}, - Queries: []*pluginv2.DataQuery{oldQuery}, + PluginContext: &pluginv2.PluginContext{ + // Enable feature flag + GrafanaConfig: map[string]string{ + featuretoggles.EnabledFeatures: "dsQueryConvert", + }, + }, + Queries: []*pluginv2.DataQuery{oldQuery}, }) require.NoError(t, err) }) diff --git a/backend/datasource/manage.go b/backend/datasource/manage.go index 6fc1464bf..2abf2be18 100644 --- a/backend/datasource/manage.go +++ b/backend/datasource/manage.go @@ -24,9 +24,6 @@ type ManageOpts struct { // Stateless conversion handler ConversionHandler backend.ConversionHandler - - // Stateless query conversion handler - QueryConversionHandler backend.QueryConversionHandler } // Manage starts serving the data source over gPRC with automatic instance management. @@ -52,9 +49,9 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt CallResourceHandler: handler, QueryDataHandler: handler, StreamHandler: handler, + QueryConversionHandler: handler, AdmissionHandler: opts.AdmissionHandler, GRPCSettings: opts.GRPCSettings, ConversionHandler: opts.ConversionHandler, - QueryConversionHandler: opts.QueryConversionHandler, }) } diff --git a/backend/serve.go b/backend/serve.go index 74564428c..aaa85d4b9 100644 --- a/backend/serve.go +++ b/backend/serve.go @@ -94,7 +94,7 @@ func GRPCServeOpts(opts ServeOpts) grpcplugin.ServeOpts { pluginOpts.AdmissionServer = newAdmissionSDKAdapter(opts.AdmissionHandler) } - if opts.ConversionHandler != nil || opts.QueryConversionHandler != nil { + if opts.ConversionHandler != nil { pluginOpts.ConversionServer = newConversionSDKAdapter(opts.ConversionHandler) } return pluginOpts diff --git a/internal/automanagement/manager.go b/internal/automanagement/manager.go index eac8a61c9..526bbfa20 100644 --- a/internal/automanagement/manager.go +++ b/internal/automanagement/manager.go @@ -92,3 +92,14 @@ func (m *Manager) RunStream(ctx context.Context, req *backend.RunStreamRequest, } return status.Error(codes.Unimplemented, "unimplemented") } + +func (m *Manager) ConvertQuery(ctx context.Context, req *backend.QueryConversionRequest) (*backend.QueryConversionResponse, error) { + h, err := m.Get(ctx, req.PluginContext) + if err != nil { + return nil, err + } + if ds, ok := h.(backend.QueryConversionHandler); ok { + return ds.ConvertQuery(ctx, req) + } + return nil, status.Error(codes.Unimplemented, "unimplemented") +} From 047a393c212e990e6018d767f38bcdf8b2b72014 Mon Sep 17 00:00:00 2001 From: Andres Martinez Gotor Date: Thu, 5 Sep 2024 11:23:49 +0200 Subject: [PATCH 3/3] update comments --- backend/query_conversion.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/query_conversion.go b/backend/query_conversion.go index 98ee48307..500fc7bfc 100644 --- a/backend/query_conversion.go +++ b/backend/query_conversion.go @@ -13,12 +13,12 @@ type QueryConversionHandler interface { type ConvertQueryFunc func(context.Context, *QueryConversionRequest) (*QueryConversionResponse, error) -// ConvertObjects calls fn(ctx, req). +// ConvertQuery calls fn(ctx, req). func (fn ConvertQueryFunc) ConvertQuery(ctx context.Context, req *QueryConversionRequest) (*QueryConversionResponse, error) { return fn(ctx, req) } -// ConversionRequest supports converting an object from on version to another +// QueryConversionRequest supports converting a query from on version to another type QueryConversionRequest struct { // NOTE: this may not include app or datasource instance settings depending on the request PluginContext PluginContext `json:"pluginContext,omitempty"`