Skip to content

Commit

Permalink
feat(io): rest support formdata
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Dec 18, 2024
1 parent 11e3215 commit e9e1e51
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 14 deletions.
24 changes: 13 additions & 11 deletions internal/io/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ type RefreshTokenConf struct {
}

type RawConf struct {
Url string `json:"url"`
Method string `json:"method"`
Body string `json:"body"`
BodyType string `json:"bodyType"`
Format string `json:"format"`
Headers map[string]string `json:"headers"`
Timeout cast.DurationConf `json:"timeout"`
Incremental bool `json:"incremental"`
Url string `json:"url"`
Method string `json:"method"`
Body string `json:"body"`
BodyType string `json:"bodyType"`
Format string `json:"format"`
Headers map[string]string `json:"headers"`
FormData map[string]string `json:"formData"`
FileFieldName string `json:"fileFieldName"`
Timeout cast.DurationConf `json:"timeout"`
Incremental bool `json:"incremental"`

OAuth map[string]map[string]interface{} `json:"oauth"`
SendSingle bool `json:"sendSingle"`
Expand All @@ -91,7 +93,7 @@ type bodyResp struct {
Code int `json:"code"`
}

var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "", "binary": "application/octet-stream"}
var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "", "binary": "application/octet-stream", "formdata": "multipart/form-data"}

// newTransport allows EdgeX Foundry, protected by OpenZiti to override and obtain a transport
// protected by OpenZiti's zero trust connectivity. See client_edgex.go where this function is
Expand Down Expand Up @@ -141,13 +143,13 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
if _, ok2 := bodyTypeMap[strings.ToLower(c.BodyType)]; ok2 {
c.BodyType = strings.ToLower(c.BodyType)
} else {
return fmt.Errorf("Not valid body type value %v.", c.BodyType)
return fmt.Errorf("Invalid body type value %v.", c.BodyType)
}
switch c.ResponseType {
case "code", "body":
// correct
default:
return fmt.Errorf("Not valid response type value %v.", c.ResponseType)
return fmt.Errorf("Invalid response type value %v.", c.ResponseType)
}
err := httpx.IsHttpUrl(c.Url)
if err != nil {
Expand Down
15 changes: 12 additions & 3 deletions internal/io/http/rest_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
headers := r.config.Headers
bodyType := r.config.BodyType
method := r.config.Method
formData := r.config.FormData
u := r.config.Url

if dp, ok := item.(api.HasDynamicProps); ok {
Expand All @@ -85,6 +86,14 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
if ok {
u = nu
}
if bodyType == "formdata" {
for k, v := range formData {
nv, ok := dp.DynamicProps(v)
if ok {
formData[k] = nv
}
}
}
}

switch r.config.Compression {
Expand All @@ -100,7 +109,7 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
headers["Content-Encoding"] = "gzip"
}

resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, item.Raw())
resp, err := httpx.SendWithFormData(ctx.GetLogger(), r.client, bodyType, method, u, headers, formData, r.config.FileFieldName, item.Raw())
failpoint.Inject("recoverAbleErr", func() {
err = errors.New("connection reset by peer")
})
Expand All @@ -115,10 +124,10 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
method,
u, string(item.Raw())))
}
return fmt.Errorf(`rest sink fails to send out the data:err=%s recoverAble=%v method=%s path="%s" request_body="%s"`,
return fmt.Errorf(`rest sink fails to send out the data:err=%s recoverAble=%v method=%s path="%s"`,
originErr.Error(),
recoverAble,
method, u, string(item.Raw()))
method, u)
} else {
logger.Debugf("rest sink got response %v", resp)
_, b, err := r.parseResponse(ctx, resp, "", r.config.DebugResp, false)
Expand Down
42 changes: 42 additions & 0 deletions internal/pkg/httpx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@ import (
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/pingcap/failpoint"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "application/x-www-form-urlencoded;param=value"}

// Send v must be a []byte or map
func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, v any) (*http.Response, error) {
return SendWithFormData(logger, client, bodyType, method, u, headers, nil, "", v)
}

func SendWithFormData(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, formData map[string]string, formFieldName string, v any) (*http.Response, error) {
var req *http.Request
var err error
switch bodyType {
Expand Down Expand Up @@ -70,6 +77,41 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", BodyTypeMap[bodyType])
}
case "formdata":
var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody)
fileField, err := writer.CreateFormFile(formFieldName, strconv.FormatInt(timex.GetNowInMilli(), 10))
if err != nil {
return nil, fmt.Errorf("fail to create file field: %v", err)
}
var payload io.Reader
switch t := v.(type) {
case []byte:
payload = bytes.NewBuffer(t)
case string:
payload = bytes.NewBufferString(t)
default:
return nil, fmt.Errorf("http send only supports bytes but receive invalid content: %v", v)
}
_, err = io.Copy(fileField, payload)
if err != nil {
return nil, fmt.Errorf("fail to copy payload to file field: %v", err)
}
for k, v := range formData {
err := writer.WriteField(k, v)
if err != nil {
logger.Errorf("fail write form data field %s: %v", k, err)
}
}
err = writer.Close()
if err != nil {
logger.Errorf("fail to close writer: %v", err)
}
req, err = http.NewRequest(method, u, &requestBody)
if err != nil {
return nil, fmt.Errorf("fail to create request: %v", err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())
default:
return nil, fmt.Errorf("unsupported body type %s", bodyType)
}
Expand Down

0 comments on commit e9e1e51

Please sign in to comment.