Skip to content

Commit

Permalink
增加自定义decoder的支持并优化decode期间ReadAll的速度 (#384)
Browse files Browse the repository at this point in the history
* 增加自定义decoder的支持并优化decode期间ReadAll的速度

* 修正版本兼容

* 修正版本兼容
  • Loading branch information
asjdf authored Mar 21, 2024
1 parent 6a9cbe6 commit 012ba1a
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 8 deletions.
6 changes: 6 additions & 0 deletions dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func (df *DataFlow) BindXML(obj interface{}) *DataFlow {
return df
}

// BindDecoder allow user parse data by their own decoder
func (df *DataFlow) BindDecoder(decode decode.Decoder) *DataFlow {
df.Req.bodyDecoder = append(df.Req.bodyDecoder, decode)
return df
}

// Code parse the http code into the variable httpCode
func (df *DataFlow) Code(httpCode *int) *DataFlow {
df.Req.httpCode = httpCode
Expand Down
54 changes: 46 additions & 8 deletions dataflow/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dataflow
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -380,16 +381,19 @@ func (r *Req) GetContext() context.Context {
// TODO 优化代码,每个decode都有自己的指针偏移直接指向流,减少大body的内存使用
func (r *Req) decodeBody(req *http.Request, resp *http.Response) (err error) {
if r.bodyDecoder != nil {
var all []byte
if len(r.bodyDecoder) > 1 {
all, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
// 已经取走数据,直接关闭body
resp.Body.Close()
// 当只有一个解码器时,直接在流上操作,避免读取整个响应体
if len(r.bodyDecoder) == 1 {
defer resp.Body.Close() // 确保在读取完成后关闭body
return r.bodyDecoder[0].Decode(resp.Body)
}

// 当有多个解码器需要处理响应体时,才读取整个响应体到内存中
all, err := ReadAll(resp)
if err != nil {
return err
}
resp.Body.Close() // 已经取走数据,直接关闭body

for _, bodyDecoder := range r.bodyDecoder {
if len(all) > 0 {
resp.Body = ioutil.NopCloser(bytes.NewReader(all))
Expand Down Expand Up @@ -598,3 +602,37 @@ func reqDef(method string, url string, g *Gout, urlStruct ...interface{}) Req {

return r
}

// ReadAll returns the whole response body as bytes.
// This is an optimized version of `io.ReadAll`.
func ReadAll(resp *http.Response) ([]byte, error) {
if resp == nil {
return nil, errors.New("response cannot be nil")
}
switch {
case resp.ContentLength == 0:
return []byte{}, nil
// if we know the body length we can allocate the buffer only once
case resp.ContentLength >= 0:
body := make([]byte, resp.ContentLength)
_, err := io.ReadFull(resp.Body, body)
if err != nil {
return nil, fmt.Errorf("failed to read the response body with a known length %d: %w", resp.ContentLength, err)
}
return body, nil

default:
// using `bytes.NewBuffer` + `io.Copy` is much faster than `io.ReadAll`
// see https://github.com/elastic/beats/issues/36151#issuecomment-1931696767
buf := bytes.NewBuffer(nil)
_, err := io.Copy(buf, resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read the response body with unknown length: %w", err)
}
body := buf.Bytes()
if body == nil {
body = []byte{}
}
return body, nil
}
}
116 changes: 116 additions & 0 deletions dataflow/req_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package dataflow

import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -90,3 +95,114 @@ func Test_Valid(t *testing.T) {
assert.Error(t, err)
}
}

func TestReadAll(t *testing.T) {
size := 100
body := bytes.Repeat([]byte{'a'}, size)
cases := []struct {
name string
resp *http.Response
expBody []byte
}{
{
name: "reads known size",
resp: &http.Response{
ContentLength: int64(size),
Body: ioutil.NopCloser(bytes.NewBuffer(body)),
},
expBody: body,
},
{
name: "reads unknown size",
resp: &http.Response{
ContentLength: -1,
Body: ioutil.NopCloser(bytes.NewBuffer(body)),
},
expBody: body,
},
{
name: "supports empty with size=0",
resp: &http.Response{
ContentLength: 0,
Body: ioutil.NopCloser(bytes.NewBuffer(nil)),
},
expBody: []byte{},
},
{
name: "supports empty with unknown size",
resp: &http.Response{
ContentLength: -1,
Body: ioutil.NopCloser(bytes.NewBuffer(nil)),
},
expBody: []byte{},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
actBody, err := ReadAll(tc.resp)
require.NoError(t, err)
require.Equal(t, tc.expBody, actBody)
})
}
}

func BenchmarkReadAll(b *testing.B) {
sizes := []int{
100, // 100 bytes
100 * 1024, // 100KB
1024 * 1024, // 1MB
}
for _, size := range sizes {
b.Run(fmt.Sprintf("size: %d", size), func(b *testing.B) {

// emulate a file or an HTTP response
generated := bytes.Repeat([]byte{'a'}, size)
content := bytes.NewReader(generated)
cases := []struct {
name string
resp *http.Response
}{
{
name: "unknown length",
resp: &http.Response{
ContentLength: -1,
Body: ioutil.NopCloser(content),
},
},
{
name: "known length",
resp: &http.Response{
ContentLength: int64(size),
Body: ioutil.NopCloser(content),
},
},
}

b.ResetTimer()

for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
b.Run("io.ReadAll", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := content.Seek(0, io.SeekStart) // reset
require.NoError(b, err)
data, err := ioutil.ReadAll(tc.resp.Body)
require.NoError(b, err)
require.Equalf(b, size, len(data), "size does not match, expected %d, actual %d", size, len(data))
}
})
b.Run("bytes.Buffer+io.Copy", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := content.Seek(0, io.SeekStart) // reset
require.NoError(b, err)
data, err := ReadAll(tc.resp)
require.NoError(b, err)
require.Equalf(b, size, len(data), "size does not match, expected %d, actual %d", size, len(data))
}
})
})
}
})
}
}

0 comments on commit 012ba1a

Please sign in to comment.