Skip to content

Commit

Permalink
feat: add rpc method prometheus label
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanViast committed Oct 18, 2024
1 parent 76eb39f commit b84615a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 29 deletions.
29 changes: 13 additions & 16 deletions pkg/filters/proxies/providerproxy/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package providerproxy

import (
"net/http"
"strconv"
"time"

Expand All @@ -14,20 +13,22 @@ type (
TotalRequests *prometheus.CounterVec
RequestsDuration prometheus.ObserverVec
}

RequestMetrics struct {
Provider string
RpcMethod string
StatusCode int
Duration time.Duration
}
)

func (m *ProviderProxy) newMetrics() *metrics {

commonLabels := prometheus.Labels{
"pipelineName": m.Name(),
"kind": Kind,
"clusterName": m.spec.Super().Options().ClusterName,
"clusterRole": m.spec.Super().Options().ClusterRole,
"instanceName": m.spec.Super().Options().Name,
}
prometheusLabels := []string{
"clusterName", "clusterRole", "instanceName", "pipelineName", "kind",
"policy", "statusCode", "provider",
"pipelineName", "kind", "policy", "statusCode", "provider", "rpcMethod",
}

return &metrics{
Expand All @@ -43,18 +44,14 @@ func (m *ProviderProxy) newMetrics() *metrics {
}
}

type RequestStat struct {
StatusCode int // e.g. 200
Duration time.Duration
Method *string // rpc provider method e.g. eth_blockNumber
}

func (m *ProviderProxy) collectMetrics(providerUrl string, response *http.Response) {
func (m *ProviderProxy) collectMetrics(requestMetrics RequestMetrics) {
labels := prometheus.Labels{
"policy": m.spec.Policy,
"statusCode": strconv.Itoa(response.StatusCode),
"provider": providerUrl,
"statusCode": strconv.Itoa(requestMetrics.StatusCode),
"provider": requestMetrics.Provider,
"rpcMethod": requestMetrics.RpcMethod,
}

m.metrics.TotalRequests.With(labels).Inc()
m.metrics.RequestsDuration.With(labels).Observe(float64(requestMetrics.Duration.Milliseconds()))
}
33 changes: 31 additions & 2 deletions pkg/filters/proxies/providerproxy/providerproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package providerproxy

import (
"encoding/json"
"errors"
"net/http"
"net/url"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/fasttime"
)

const (
Expand Down Expand Up @@ -62,16 +64,41 @@ func (m *ProviderProxy) SelectNode() (*url.URL, error) {
return url.Parse(rpcUrl)
}

func (m *ProviderProxy) ParsePayloadMethod(payload []byte) string {
defaultValue := "UNKNOWN"
if len(payload) <= 0 {
return defaultValue
}

jsonBody := map[string]interface{}{}
err := json.Unmarshal(payload, &jsonBody)
if err != nil {
return defaultValue
}

method, exists := jsonBody["method"].(string)
if !exists {
return defaultValue
}
return method
}

func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
requestMetrics := RequestMetrics{}

startTime := fasttime.Now()
reqUrl, err := m.SelectNode()
if err != nil {
logger.Errorf(err.Error())
return err.Error()
}

logger.Infof("select rpc provider: %s", reqUrl.String())
requestMetrics.Provider = reqUrl.String()
req := ctx.GetInputRequest().(*httpprot.Request)

requestMetrics.RpcMethod = m.ParsePayloadMethod(req.RawPayload())

forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method(), reqUrl.String(), req.GetPayload())
if err != nil {
logger.Errorf(err.Error())
Expand All @@ -83,7 +110,10 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
}

response, err := m.client.Do(forwardReq)
defer m.collectMetrics(reqUrl.String(), response)

requestMetrics.Duration = fasttime.Since(startTime)
requestMetrics.StatusCode = response.StatusCode
defer m.collectMetrics(requestMetrics)

if err != nil {
logger.Errorf(err.Error())
Expand All @@ -100,7 +130,6 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
logger.Errorf("%s: failed to fetch response payload: %v, please consider to set serverMaxBodySize of SimpleHTTPProxy to -1.", m.Name(), err)
return err.Error()
}

ctx.SetResponse(context.DefaultNamespace, outputResponse)
return ""
}
Expand Down
62 changes: 56 additions & 6 deletions pkg/filters/proxies/providerproxy/providerproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package providerproxy

import (
"bytes"
"fmt"
"net/http"
"os"
"strings"
"testing"

"github.com/megaease/easegress/v2/pkg/context"
"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/option"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/tracing"
"github.com/megaease/easegress/v2/pkg/util/codectool"
"github.com/stretchr/testify/assert"
Expand All @@ -39,11 +42,25 @@ func TestMain(m *testing.M) {
}

func newTestProviderProxy(yamlConfig string, assert *assert.Assertions) *ProviderProxy {
defer func() {
if err := recover(); err != nil {
fmt.Printf("Recovered from panic: %v\n", err)
}
}()

rawSpec := make(map[string]interface{})
err := codectool.Unmarshal([]byte(yamlConfig), &rawSpec)
assert.NoError(err)

spec, err := filters.NewSpec(nil, "", rawSpec)
opt := option.New()
opt.Name = "test"
opt.ClusterName = "test"
opt.ClusterRole = "secondary"

super := supervisor.NewMock(opt, nil, nil,
nil, false, nil, nil)

spec, err := filters.NewSpec(super, "", rawSpec)
assert.NoError(err)

proxy := kind.CreateInstance(spec).(*ProviderProxy)
Expand All @@ -61,7 +78,10 @@ func getCtx(stdr *http.Request) *context.Context {
req.HTTPHeader().Set(key, stdr.Header.Get(key))
}

_ = req.FetchPayload(1024 * 1024)
err := req.FetchPayload(1024 * 1024)
if err != nil {
logger.Errorf(err.Error())
}
ctx := context.New(tracing.NoopSpan)
ctx.SetRequest(context.DefaultNamespace, req)
return ctx
Expand All @@ -74,17 +94,47 @@ func TestProviderProxy(t *testing.T) {
name: providerProxy
kind: ProviderProxy
urls:
- https://ethereum-mainnet.s.chainbase.online
- https://eth.llamarpc.com
`
proxy := newTestProviderProxy(yamlConfig, assert)

postData := "{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"

stdr, _ := http.NewRequest(http.MethodPost, "https://www.megaease.com", bytes.NewReader([]byte(postData)))
stdr, _ := http.NewRequest(http.MethodPost, "https://www.megaease.com", strings.NewReader(postData))
stdr.Header.Set("Content-Type", "application/json")
ctx := getCtx(stdr)
response := proxy.Handle(ctx)
assert.Equal("", response)
assert.NotNil(string(ctx.GetOutputResponse().RawPayload()))
assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload())

proxy.Close()
}

func TestProviderProxy_ParsePayloadMethod(t *testing.T) {
assert := assert.New(t)

const yamlConfig = `
name: providerProxy
kind: ProviderProxy
urls:
- https://eth.llamarpc.com
`
proxy := newTestProviderProxy(yamlConfig, assert)

method := proxy.ParsePayloadMethod([]byte("{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"))
assert.Equal("eth_blockNumber", method)

method = proxy.ParsePayloadMethod([]byte("{\"method\":\"eth_getBlockByNumber\",\"params\":[\"0xc5043f\",false],\"id\":1,\"jsonrpc\":\"2.0\"}"))
assert.Equal("eth_getBlockByNumber", method)

method = proxy.ParsePayloadMethod([]byte("test unknown payload"))
assert.Equal("UNKNOWN", method)

method = proxy.ParsePayloadMethod([]byte{})
assert.Equal("UNKNOWN", method)

method = proxy.ParsePayloadMethod([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"suix_getAllBalances\",\"params\":[\"0x94f1a597b4e8f709a396f7f6b1482bdcd65a673d111e49286c527fab7c2d0961\"]}"))
assert.Equal("suix_getAllBalances", method)

proxy.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,9 @@ func newMetrics(super *supervisor.Supervisor) *metrics {
commonLabels := prometheus.Labels{
"pipelineName": super.Options().Name,
"kind": "BlockLagProviderSelector",
"clusterName": super.Options().ClusterName,
"clusterRole": super.Options().ClusterRole,
"instanceName": super.Options().Name,
}
prometheusLabels := []string{
"clusterName", "clusterRole", "instanceName", "pipelineName", "kind",
"provider",
"pipelineName", "kind", "provider",
}

return &metrics{
Expand Down

0 comments on commit b84615a

Please sign in to comment.