Skip to content

Commit

Permalink
feat: support URL method forwarding & fix socket connection recycling
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanViast committed Oct 28, 2024
1 parent 605aa9c commit 89b9d30
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
31 changes: 27 additions & 4 deletions pkg/filters/proxies/providerproxy/providerproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/fasttime"
"github.com/megaease/easegress/v2/pkg/util/readers"
)

const (
Expand Down Expand Up @@ -100,6 +101,19 @@ func (m *ProviderProxy) ParsePayloadMethod(payload []byte) []string {
return methods
}

func (m *ProviderProxy) HandleRequest(req *httpprot.Request, providerUrl *url.URL) (forwardReq *http.Request, method []string, err error) {
if len(req.URL().Path) != 0 {
providerUrl = providerUrl.JoinPath(req.URL().Path)
method = []string{req.URL().Path}
} else {
bodyBytes := req.RawPayload()
method = m.ParsePayloadMethod(bodyBytes)
}

forwardReq, err = http.NewRequestWithContext(req.Context(), req.Method(), providerUrl.String(), req.GetPayload())
return
}

func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
requestMetrics := RequestMetrics{}

Expand All @@ -113,10 +127,8 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
logger.Infof("select rpc provider: %s", reqUrl.String())
requestMetrics.Provider = reqUrl.String()
req := ctx.GetInputRequest().(*httpprot.Request)
forwardReq, method, err := m.HandleRequest(req, reqUrl)

requestMetrics.RpcMethod = m.ParsePayloadMethod(req.RawPayload())

forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method(), reqUrl.String(), req.GetPayload())
if err != nil {
logger.Errorf(err.Error())
return err.Error()
Expand All @@ -127,25 +139,36 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
}

response, err := m.client.Do(forwardReq)

if err != nil {
logger.Errorf(err.Error())
return err.Error()
}

requestMetrics.RpcMethod = method
requestMetrics.Duration = fasttime.Since(startTime)
requestMetrics.StatusCode = response.StatusCode
defer m.collectMetrics(requestMetrics)

body := readers.NewCallbackReader(response.Body)
response.Body = body
outputResponse, err := httpprot.NewResponse(response)
outputResponse.Body = response.Body

if err != nil {
logger.Errorf(err.Error())
response.Body.Close()
return err.Error()
}

if err = outputResponse.FetchPayload(-1); err != nil {
logger.Errorf("%s: failed to fetch response payload: %v, please consider to set serverMaxBodySize of SimpleHTTPProxy to -1.", m.Name(), err)
response.Body.Close()
return err.Error()
}

if !outputResponse.IsStream() {
response.Body.Close()
}
ctx.SetResponse(context.DefaultNamespace, outputResponse)
return ""
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/filters/proxies/providerproxy/providerproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ urls:
- https://eth.llamarpc.com
`
proxy := newTestProviderProxy(yamlConfig, assert)
defer proxy.Close()

postData := "{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"

Expand All @@ -106,8 +107,27 @@ urls:
response := proxy.Handle(ctx)
assert.Equal("", response)
assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload())
}

proxy.Close()
func TestURLProviderProxy(t *testing.T) {
assert := assert.New(t)

const yamlConfig = `
name: tron-providerProxy
kind: ProviderProxy
urls:
- https://docs-demo.tron-mainnet.quiknode.pro
`
proxy := newTestProviderProxy(yamlConfig, assert)
defer proxy.Close()

postData := "{\"id_or_num\":\"66484052\",\"detail\":false}"
stdr, _ := http.NewRequest(http.MethodPost, "https://rpc.tron.network/wallet/getblock", strings.NewReader(postData))
stdr.Header.Set("Content-Type", "application/json")
ctx := getCtx(stdr)
response := proxy.Handle(ctx)
assert.Equal("", response)
assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload())
}

func TestProviderProxy_ParsePayloadMethod(t *testing.T) {
Expand Down

0 comments on commit 89b9d30

Please sign in to comment.