Skip to content

Commit

Permalink
support method metrics (#3)
Browse files Browse the repository at this point in the history
* feat: add endpoint metrics

* feat: add upgrade.sh

* feat: add rpc method prometheus label

* fix: parse batch request json
  • Loading branch information
AlanViast authored Oct 22, 2024
1 parent ac6a86e commit 605aa9c
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 50 deletions.
59 changes: 59 additions & 0 deletions pkg/filters/proxies/providerproxy/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package providerproxy

import (
"strconv"
"time"

"github.com/megaease/easegress/v2/pkg/util/prometheushelper"
"github.com/prometheus/client_golang/prometheus"
)

type (
metrics struct {
TotalRequests *prometheus.CounterVec
RequestsDuration prometheus.ObserverVec
}

RequestMetrics struct {
Provider string
RpcMethod []string
StatusCode int
Duration time.Duration
}
)

func (m *ProviderProxy) newMetrics() *metrics {
commonLabels := prometheus.Labels{
"pipelineName": m.Name(),
"kind": Kind,
}
prometheusLabels := []string{
"pipelineName", "kind", "policy", "statusCode", "provider", "rpcMethod",
}

return &metrics{
TotalRequests: prometheushelper.NewCounter(
"providerproxy_total_requests",
"the total count of http requests", prometheusLabels).MustCurryWith(commonLabels),
RequestsDuration: prometheushelper.NewHistogram(
prometheus.HistogramOpts{
Name: "providerproxy_requests_duration",
Help: "request processing duration histogram of a backend",
Buckets: prometheushelper.DefaultDurationBuckets(),
}, prometheusLabels).MustCurryWith(commonLabels),
}
}

func (m *ProviderProxy) collectMetrics(requestMetrics RequestMetrics) {
for _, method := range requestMetrics.RpcMethod {
labels := prometheus.Labels{
"policy": m.spec.Policy,
"statusCode": strconv.Itoa(requestMetrics.StatusCode),
"provider": requestMetrics.Provider,
"rpcMethod": method,
}

m.metrics.TotalRequests.With(labels).Inc()
m.metrics.RequestsDuration.With(labels).Observe(float64(requestMetrics.Duration.Milliseconds() / int64(len(requestMetrics.RpcMethod))))
}
}
108 changes: 89 additions & 19 deletions pkg/filters/proxies/providerproxy/providerproxy.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
/*
* Copyright (c) 2017, The Easegress Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package providerproxy

import (
"encoding/json"
"errors"
"math/rand"
"net/http"
"net/url"

"github.com/megaease/easegress/v2/pkg/context"
"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/filters/proxies/providerproxy/selector"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/fasttime"
)

const (
Expand All @@ -19,9 +39,11 @@ const (

type (
ProviderProxy struct {
super *supervisor.Supervisor
spec *Spec
client *http.Client
providerSelector *ProviderSelector
providerSelector selector.ProviderSelector
metrics *metrics
}

Spec struct {
Expand All @@ -30,33 +52,76 @@ type (
Urls []string `yaml:"urls"`
Interval string `yaml:"interval,omitempty" jsonschema:"format=duration"`
Lag uint64 `yaml:"lag,omitempty" jsonschema:"default=100"`
Policy string `yaml:"policy,omitempty" jsonschema:"default=roundRobin"`
}
)

func (m *ProviderProxy) SelectNode() (*url.URL, error) {
if m.providerSelector == nil {
urls := m.spec.Urls
randomIndex := rand.Intn(len(urls))
rpcUrl := urls[randomIndex]
return url.Parse(rpcUrl)
}
rpcUrl, err := m.providerSelector.ChooseServer()
if err != nil {
return nil, err
}
return url.Parse(rpcUrl)
}

func (m *ProviderProxy) ParsePayloadMethod(payload []byte) []string {
defaultValue := []string{"UNKNOWN"}
if len(payload) <= 0 {
return defaultValue
}

jsonBody := map[string]interface{}{}
err := json.Unmarshal(payload, &jsonBody)
if err == nil {
method, exists := jsonBody["method"].(string)
if !exists {
return defaultValue
}
return []string{method}
}

// parse batch call json array
var jsonBodyArr []map[string]interface{}
err = json.Unmarshal(payload, &jsonBodyArr)
if err != nil {
logger.Errorf("parse batch call err: %s, Body: %s", err, string(payload))
return defaultValue
}

methods := make([]string, 0)

for _, item := range jsonBodyArr {
method, exists := item["method"].(string)
if !exists {
methods = append(methods, "UNKNOWN")
}
methods = append(methods, method)
}
return methods
}

func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
requestMetrics := RequestMetrics{}

startTime := fasttime.Now()
reqUrl, err := m.SelectNode()
if err != nil {
logger.Errorf(err.Error())
return err.Error()
}

logger.Infof("select rpc provider: %s", reqUrl.String())
requestMetrics.Provider = reqUrl.String()
req := ctx.GetInputRequest().(*httpprot.Request)

requestMetrics.RpcMethod = m.ParsePayloadMethod(req.RawPayload())

forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method(), reqUrl.String(), req.GetPayload())
if err != nil {
logger.Errorf(err.Error())
return err.Error()
}

for key := range req.HTTPHeader() {
forwardReq.Header.Add(key, req.HTTPHeader().Get(key))
}
Expand All @@ -67,17 +132,20 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
return err.Error()
}

requestMetrics.Duration = fasttime.Since(startTime)
requestMetrics.StatusCode = response.StatusCode
defer m.collectMetrics(requestMetrics)

outputResponse, err := httpprot.NewResponse(response)
outputResponse.Body = response.Body
if err != nil {
return err.Error()
}

if err = outputResponse.FetchPayload(1024 * 1024); err != nil {
if err = outputResponse.FetchPayload(-1); err != nil {
logger.Errorf("%s: failed to fetch response payload: %v, please consider to set serverMaxBodySize of SimpleHTTPProxy to -1.", m.Name(), err)
return err.Error()
}

ctx.SetResponse(context.DefaultNamespace, outputResponse)
return ""
}
Expand All @@ -90,12 +158,14 @@ var kind = &filters.Kind{
return &Spec{
Urls: make([]string, 0),
Interval: "1s",
Policy: "roundRobin",
}
},
CreateInstance: func(spec filters.Spec) filters.Filter {
providerSpec := spec.(*Spec)
return &ProviderProxy{
spec: providerSpec,
super: spec.Super(),
client: http.DefaultClient,
}
},
Expand Down Expand Up @@ -130,16 +200,16 @@ func (m *ProviderProxy) reload() {
client := http.DefaultClient
m.client = client

if len(m.spec.Urls) > 1 {
providerSelectorSpec := ProviderSelectorSpec{
Urls: m.spec.Urls,
Interval: m.spec.Interval,
Lag: m.spec.Lag,
}

providerSelector := NewProviderSelector(providerSelectorSpec)
m.providerSelector = &providerSelector
providerSelectorSpec := selector.ProviderSelectorSpec{
Urls: m.spec.Urls,
Interval: m.spec.Interval,
Lag: m.spec.Lag,
}

m.metrics = m.newMetrics()

providerSelector := selector.CreateProviderSelectorByPolicy(m.spec.Policy, providerSelectorSpec, m.super)
m.providerSelector = providerSelector
}

// Status returns status.
Expand Down
65 changes: 59 additions & 6 deletions pkg/filters/proxies/providerproxy/providerproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package providerproxy

import (
"bytes"
"fmt"
"net/http"
"os"
"strings"
"testing"

"github.com/megaease/easegress/v2/pkg/context"
"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/option"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/tracing"
"github.com/megaease/easegress/v2/pkg/util/codectool"
"github.com/stretchr/testify/assert"
Expand All @@ -39,11 +42,25 @@ func TestMain(m *testing.M) {
}

func newTestProviderProxy(yamlConfig string, assert *assert.Assertions) *ProviderProxy {
defer func() {
if err := recover(); err != nil {
fmt.Printf("Recovered from panic: %v\n", err)
}
}()

rawSpec := make(map[string]interface{})
err := codectool.Unmarshal([]byte(yamlConfig), &rawSpec)
assert.NoError(err)

spec, err := filters.NewSpec(nil, "", rawSpec)
opt := option.New()
opt.Name = "test"
opt.ClusterName = "test"
opt.ClusterRole = "secondary"

super := supervisor.NewMock(opt, nil, nil,
nil, false, nil, nil)

spec, err := filters.NewSpec(super, "", rawSpec)
assert.NoError(err)

proxy := kind.CreateInstance(spec).(*ProviderProxy)
Expand All @@ -61,7 +78,10 @@ func getCtx(stdr *http.Request) *context.Context {
req.HTTPHeader().Set(key, stdr.Header.Get(key))
}

_ = req.FetchPayload(1024 * 1024)
err := req.FetchPayload(1024 * 1024)
if err != nil {
logger.Errorf(err.Error())
}
ctx := context.New(tracing.NoopSpan)
ctx.SetRequest(context.DefaultNamespace, req)
return ctx
Expand All @@ -74,17 +94,50 @@ func TestProviderProxy(t *testing.T) {
name: providerProxy
kind: ProviderProxy
urls:
- https://ethereum-mainnet.s.chainbase.online
- https://eth.llamarpc.com
`
proxy := newTestProviderProxy(yamlConfig, assert)

postData := "{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"

stdr, _ := http.NewRequest(http.MethodPost, "https://www.megaease.com", bytes.NewReader([]byte(postData)))
stdr, _ := http.NewRequest(http.MethodPost, "https://www.megaease.com", strings.NewReader(postData))
stdr.Header.Set("Content-Type", "application/json")
ctx := getCtx(stdr)
response := proxy.Handle(ctx)
assert.Equal("", response)
assert.NotNil(string(ctx.GetOutputResponse().RawPayload()))
assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload())

proxy.Close()
}

func TestProviderProxy_ParsePayloadMethod(t *testing.T) {
assert := assert.New(t)

const yamlConfig = `
name: providerProxy
kind: ProviderProxy
urls:
- https://eth.llamarpc.com
`
proxy := newTestProviderProxy(yamlConfig, assert)

method := proxy.ParsePayloadMethod([]byte("{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"))
assert.Equal([]string{"eth_blockNumber"}, method)

method = proxy.ParsePayloadMethod([]byte("{\"method\":\"eth_getBlockByNumber\",\"params\":[\"0xc5043f\",false],\"id\":1,\"jsonrpc\":\"2.0\"}"))
assert.Equal([]string{"eth_getBlockByNumber"}, method)

method = proxy.ParsePayloadMethod([]byte("test unknown payload"))
assert.Equal([]string{"UNKNOWN"}, method)

method = proxy.ParsePayloadMethod([]byte{})
assert.Equal([]string{"UNKNOWN"}, method)

method = proxy.ParsePayloadMethod([]byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"suix_getAllBalances\",\"params\":[\"0x94f1a597b4e8f709a396f7f6b1482bdcd65a673d111e49286c527fab7c2d0961\"]}"))
assert.Equal([]string{"suix_getAllBalances"}, method)

method = proxy.ParsePayloadMethod([]byte("[{\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x7363bf80269875c6ddd3de0089baf0a9af28586dd0e536753d1cbb5eb9d6535b\"], \"id\": 0}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x29696eba0fa0eb5eb4c1495174c6fb37a9e64a3707dc8b64c9d19a650c8e1b5f\"], \"id\": 1}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x8f9631d0fd6a056e422ed0eea938a84b1eca3344729789f4465861c6406c2114\"], \"id\": 2}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x4787c3d707ff6d204590c77f672917dca49c36ec0381d6dea10d84981d008ec5\"], \"id\": 3}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x11132ec86262aa67b27f64a5f5a6550c7b86f85dabe3d824dbab0f20283f9aa6\"], \"id\": 4}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x8efac98473a7cdb5c40ca503089794b06723c09791833c2df5b4732ea5a10451\"], \"id\": 5}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x7df0d4c1b2d1a570a1367a8bd09301f8e6af5ffa03a2b97a693ba440bb87b002\"], \"id\": 6}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x7d473c856b0b8d207c43e46a3327217e809142ad5497109d01ec72d6a1bde45c\"], \"id\": 7}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x512013ff4a44b6604b81f63e330d57a63b03f23dae011753be8c168ce8f6fcc7\"], \"id\": 8}, {\"jsonrpc\": \"2.0\", \"method\": \"eth_getTransactionReceipt\", \"params\": [\"0x438abc4bfc9a46296f191ad3ecc742bbd7da9286a2a92fe8f27f2d0168a19661\"], \"id\": 9}]"))
assert.Equal([]string{"eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt", "eth_getTransactionReceipt"}, method)

proxy.Close()
}
Loading

0 comments on commit 605aa9c

Please sign in to comment.