diff --git a/pkg/filters/proxies/providerproxy/metrics.go b/pkg/filters/proxies/providerproxy/metrics.go index d628ccf946..3ad0c1e13f 100644 --- a/pkg/filters/proxies/providerproxy/metrics.go +++ b/pkg/filters/proxies/providerproxy/metrics.go @@ -1,7 +1,6 @@ package providerproxy import ( - "net/http" "strconv" "time" @@ -14,20 +13,22 @@ type ( TotalRequests *prometheus.CounterVec RequestsDuration prometheus.ObserverVec } + + RequestMetrics struct { + Provider string + RpcMethod string + StatusCode int + Duration time.Duration + } ) func (m *ProviderProxy) newMetrics() *metrics { - commonLabels := prometheus.Labels{ "pipelineName": m.Name(), "kind": Kind, - "clusterName": m.spec.Super().Options().ClusterName, - "clusterRole": m.spec.Super().Options().ClusterRole, - "instanceName": m.spec.Super().Options().Name, } prometheusLabels := []string{ - "clusterName", "clusterRole", "instanceName", "pipelineName", "kind", - "policy", "statusCode", "provider", + "pipelineName", "kind", "policy", "statusCode", "provider", "rpcMethod", } return &metrics{ @@ -43,18 +44,14 @@ func (m *ProviderProxy) newMetrics() *metrics { } } -type RequestStat struct { - StatusCode int // e.g. 200 - Duration time.Duration - Method *string // rpc provider method e.g. eth_blockNumber -} - -func (m *ProviderProxy) collectMetrics(providerUrl string, response *http.Response) { +func (m *ProviderProxy) collectMetrics(requestMetrics RequestMetrics) { labels := prometheus.Labels{ "policy": m.spec.Policy, - "statusCode": strconv.Itoa(response.StatusCode), - "provider": providerUrl, + "statusCode": strconv.Itoa(requestMetrics.StatusCode), + "provider": requestMetrics.Provider, + "rpcMethod": requestMetrics.RpcMethod, } m.metrics.TotalRequests.With(labels).Inc() + m.metrics.RequestsDuration.With(labels).Observe(float64(requestMetrics.Duration.Milliseconds())) } diff --git a/pkg/filters/proxies/providerproxy/providerproxy.go b/pkg/filters/proxies/providerproxy/providerproxy.go index 0ebbbff5a6..666255c499 100644 --- a/pkg/filters/proxies/providerproxy/providerproxy.go +++ b/pkg/filters/proxies/providerproxy/providerproxy.go @@ -18,6 +18,7 @@ package providerproxy import ( + "encoding/json" "errors" "net/http" "net/url" @@ -28,6 +29,7 @@ import ( "github.com/megaease/easegress/v2/pkg/logger" "github.com/megaease/easegress/v2/pkg/protocols/httpprot" "github.com/megaease/easegress/v2/pkg/supervisor" + "github.com/megaease/easegress/v2/pkg/util/fasttime" ) const ( @@ -62,8 +64,29 @@ func (m *ProviderProxy) SelectNode() (*url.URL, error) { return url.Parse(rpcUrl) } +func (m *ProviderProxy) ParsePayloadMethod(payload []byte) string { + defaultValue := "UNKNOWN" + if len(payload) <= 0 { + return defaultValue + } + + jsonBody := map[string]interface{}{} + err := json.Unmarshal(payload, &jsonBody) + if err != nil { + return defaultValue + } + + method, exists := jsonBody["method"].(string) + if !exists { + return defaultValue + } + return method +} + func (m *ProviderProxy) Handle(ctx *context.Context) (result string) { + requestMetrics := RequestMetrics{} + startTime := fasttime.Now() reqUrl, err := m.SelectNode() if err != nil { logger.Errorf(err.Error()) @@ -71,7 +94,11 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) { } logger.Infof("select rpc provider: %s", reqUrl.String()) + requestMetrics.Provider = reqUrl.String() req := ctx.GetInputRequest().(*httpprot.Request) + + requestMetrics.RpcMethod = m.ParsePayloadMethod(req.RawPayload()) + forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method(), reqUrl.String(), req.GetPayload()) if err != nil { logger.Errorf(err.Error()) @@ -83,7 +110,10 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) { } response, err := m.client.Do(forwardReq) - defer m.collectMetrics(reqUrl.String(), response) + + requestMetrics.Duration = fasttime.Since(startTime) + requestMetrics.StatusCode = response.StatusCode + defer m.collectMetrics(requestMetrics) if err != nil { logger.Errorf(err.Error()) @@ -100,7 +130,6 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) { logger.Errorf("%s: failed to fetch response payload: %v, please consider to set serverMaxBodySize of SimpleHTTPProxy to -1.", m.Name(), err) return err.Error() } - ctx.SetResponse(context.DefaultNamespace, outputResponse) return "" } diff --git a/pkg/filters/proxies/providerproxy/providerproxy_test.go b/pkg/filters/proxies/providerproxy/providerproxy_test.go index 637f496b92..e88ec7ba25 100644 --- a/pkg/filters/proxies/providerproxy/providerproxy_test.go +++ b/pkg/filters/proxies/providerproxy/providerproxy_test.go @@ -18,15 +18,18 @@ package providerproxy import ( - "bytes" + "fmt" "net/http" "os" + "strings" "testing" "github.com/megaease/easegress/v2/pkg/context" "github.com/megaease/easegress/v2/pkg/filters" "github.com/megaease/easegress/v2/pkg/logger" + "github.com/megaease/easegress/v2/pkg/option" "github.com/megaease/easegress/v2/pkg/protocols/httpprot" + "github.com/megaease/easegress/v2/pkg/supervisor" "github.com/megaease/easegress/v2/pkg/tracing" "github.com/megaease/easegress/v2/pkg/util/codectool" "github.com/stretchr/testify/assert" @@ -39,11 +42,25 @@ func TestMain(m *testing.M) { } func newTestProviderProxy(yamlConfig string, assert *assert.Assertions) *ProviderProxy { + defer func() { + if err := recover(); err != nil { + fmt.Printf("Recovered from panic: %v\n", err) + } + }() + rawSpec := make(map[string]interface{}) err := codectool.Unmarshal([]byte(yamlConfig), &rawSpec) assert.NoError(err) - spec, err := filters.NewSpec(nil, "", rawSpec) + opt := option.New() + opt.Name = "test" + opt.ClusterName = "test" + opt.ClusterRole = "secondary" + + super := supervisor.NewMock(opt, nil, nil, + nil, false, nil, nil) + + spec, err := filters.NewSpec(super, "", rawSpec) assert.NoError(err) proxy := kind.CreateInstance(spec).(*ProviderProxy) @@ -61,7 +78,10 @@ func getCtx(stdr *http.Request) *context.Context { req.HTTPHeader().Set(key, stdr.Header.Get(key)) } - _ = req.FetchPayload(1024 * 1024) + err := req.FetchPayload(1024 * 1024) + if err != nil { + logger.Errorf(err.Error()) + } ctx := context.New(tracing.NoopSpan) ctx.SetRequest(context.DefaultNamespace, req) return ctx @@ -74,17 +94,47 @@ func TestProviderProxy(t *testing.T) { name: providerProxy kind: ProviderProxy urls: - - https://ethereum-mainnet.s.chainbase.online + - https://eth.llamarpc.com ` proxy := newTestProviderProxy(yamlConfig, assert) postData := "{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}" - stdr, _ := http.NewRequest(http.MethodPost, "https://www.megaease.com", bytes.NewReader([]byte(postData))) + stdr, _ := http.NewRequest(http.MethodPost, "https://www.megaease.com", strings.NewReader(postData)) stdr.Header.Set("Content-Type", "application/json") ctx := getCtx(stdr) response := proxy.Handle(ctx) assert.Equal("", response) - assert.NotNil(string(ctx.GetOutputResponse().RawPayload())) + assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload()) + + proxy.Close() +} + +func TestProviderProxy_ParsePayloadMethod(t *testing.T) { + assert := assert.New(t) + + const yamlConfig = ` +name: providerProxy +kind: ProviderProxy +urls: + - https://eth.llamarpc.com +` + proxy := newTestProviderProxy(yamlConfig, assert) + + method := proxy.ParsePayloadMethod([]byte("{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}")) + assert.Equal("eth_blockNumber", method) + + method = proxy.ParsePayloadMethod([]byte("{\"method\":\"eth_getBlockByNumber\",\"params\":[\"0xc5043f\",false],\"id\":1,\"jsonrpc\":\"2.0\"}")) + assert.Equal("eth_getBlockByNumber", method) + + method = proxy.ParsePayloadMethod([]byte("test unknown payload")) + assert.Equal("UNKNOWN", method) + + method = proxy.ParsePayloadMethod([]byte{}) + assert.Equal("UNKNOWN", method) + + method = proxy.ParsePayloadMethod([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"suix_getAllBalances\",\"params\":[\"0x94f1a597b4e8f709a396f7f6b1482bdcd65a673d111e49286c527fab7c2d0961\"]}")) + assert.Equal("suix_getAllBalances", method) + proxy.Close() } diff --git a/pkg/filters/proxies/providerproxy/selector/block_lag_selector.go b/pkg/filters/proxies/providerproxy/selector/block_lag_selector.go index ca149804fb..fa7b2f12f4 100644 --- a/pkg/filters/proxies/providerproxy/selector/block_lag_selector.go +++ b/pkg/filters/proxies/providerproxy/selector/block_lag_selector.go @@ -181,13 +181,9 @@ func newMetrics(super *supervisor.Supervisor) *metrics { commonLabels := prometheus.Labels{ "pipelineName": super.Options().Name, "kind": "BlockLagProviderSelector", - "clusterName": super.Options().ClusterName, - "clusterRole": super.Options().ClusterRole, - "instanceName": super.Options().Name, } prometheusLabels := []string{ - "clusterName", "clusterRole", "instanceName", "pipelineName", "kind", - "provider", + "pipelineName", "kind", "provider", } return &metrics{