Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: webhook v2 spec upgrade #5224

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions gateway/webhook/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"testing"
Expand Down Expand Up @@ -44,6 +45,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway"

"github.com/rudderlabs/rudder-go-kit/requesttojson"
"github.com/rudderlabs/rudder-transformer/go/webhook/testcases"
)

Expand Down Expand Up @@ -321,19 +323,46 @@ func TestIntegrationWebhook(t *testing.T) {
require.NoError(t, err)
assert.Len(t, r.Jobs, len(tc.Output.ErrQueue))
for i, p := range tc.Output.ErrQueue {
errPayload, err := json.Marshal(struct {
Event json.RawMessage `json:"event"`
Source backendconfig.SourceT `json:"source"`
}{
Source: sConfig,
Event: bytes.ReplaceAll(p, []byte(`{{.WriteKey}}`), []byte(sConfig.WriteKey)),
})
var errPayload []byte
// expected error payload stored in errDB is dependant on the webhook transformation version
if webhookVersion == "v1" {
errPayload, err = json.Marshal(struct {
Event json.RawMessage `json:"event"`
Source backendconfig.SourceT `json:"source"`
}{
Source: sConfig,
Event: bytes.ReplaceAll(p, []byte(`{{.WriteKey}}`), []byte(sConfig.WriteKey)),
})
} else {
var requestPayload *requesttojson.RequestJSON
var requestPayloadBytes []byte

// set defaults assigned by go http client
req.Body = io.NopCloser(bytes.NewReader(p))
req.Method = "POST"
req.Proto = "HTTP/1.1"
req.Header.Set("Accept-Encoding", "gzip")
req.Header.Set("Content-Length", strconv.Itoa(len(p)))
req.Header.Set("User-Agent", "Go-http-client/1.1")

requestPayload, err = requesttojson.RequestToJSON(req, "{}")
requestPayloadBytes, err = json.Marshal(requestPayload)

errPayload, err = json.Marshal(struct {
Request json.RawMessage `json:"request"`
Source backendconfig.SourceT `json:"source"`
}{
Source: sConfig,
// Event: bytes.ReplaceAll(p, []byte(`{{.WriteKey}}`), []byte(sConfig.WriteKey)),
Request: requestPayloadBytes,
})
}
require.NoError(t, err)
errPayload, err = sjson.SetBytes(errPayload, "source.Destinations", nil)
require.NoError(t, err)

errPayloadWriteKey := gjson.GetBytes(p, "query_parameters.writeKey").Value()
if errPayloadWriteKey != nil {
if errPayloadWriteKey != nil && webhookVersion == "v1" {
r.Jobs[i].EventPayload, err = sjson.SetBytes(r.Jobs[i].EventPayload, "event.query_parameters.writeKey", errPayloadWriteKey)
require.NoError(t, err)
}
Expand Down
41 changes: 12 additions & 29 deletions gateway/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"mime/multipart"
"net/http"
"net/url"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -18,7 +17,6 @@ import (

"github.com/hashicorp/go-retryablehttp"
"github.com/samber/lo"
"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
Expand All @@ -28,6 +26,7 @@ import (
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/gateway/webhook/model"
"github.com/rudderlabs/rudder-server/services/transformer"
)

type webhookT struct {
Expand Down Expand Up @@ -277,29 +276,6 @@ func (webhook *HandleT) batchRequests(sourceDef string, requestQ chan *webhookT)
}
}

func prepareRequestBody(req *http.Request, sourceType string, sourceListForParsingParams []string) ([]byte, error) {
defer func() {
_ = req.Body.Close()
}()

body, err := io.ReadAll(req.Body)
if err != nil {
return nil, errors.New(response.RequestBodyReadFailed)
}

if len(body) == 0 {
body = []byte("{}") // If body is empty, set it to an empty JSON object
}

if slices.Contains(sourceListForParsingParams, strings.ToLower(sourceType)) {
queryParams := req.URL.Query()

return sjson.SetBytes(body, "query_parameters", queryParams)
}

return body, nil
}

// TODO : return back immediately for blank request body. its waiting till timeout
func (bt *batchWebhookTransformerT) batchTransformLoop() {
for breq := range bt.webhook.batchRequestQ {
Expand Down Expand Up @@ -333,15 +309,22 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() {
var webRequests []*webhookT
for _, req := range breq.batchRequest {
var payload []byte
body, err := prepareRequestBody(req.request, breq.sourceType, bt.webhook.config.sourceListForParsingParams)
if err == nil && !json.Valid(body) {
var eventRequest []byte

if sourceTransformAdapter.getAdapterVersion() == transformer.V1 {
eventRequest, err = prepareTransformerEventRequestV1(req.request, breq.sourceType, bt.webhook.config.sourceListForParsingParams)
} else {
eventRequest, err = prepareTransformerEventRequestV2(req.request)
}

if err == nil && !json.Valid(eventRequest) {
err = errors.New(response.InvalidJSON)
}
if err == nil && len(body) > bt.webhook.config.maxReqSize.Load() {
if err == nil && len(eventRequest) > bt.webhook.config.maxReqSize.Load() {
err = errors.New(response.RequestBodyTooLarge)
}
if err == nil {
payload, err = sourceTransformAdapter.getTransformerEvent(req.authContext, body)
payload, err = sourceTransformAdapter.getTransformerEvent(req.authContext, eventRequest)
}

if err != nil {
Expand Down
99 changes: 94 additions & 5 deletions gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"io"
"net/http"
"net/url"
"slices"
"strings"
"time"

"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/requesttojson"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/response"
Expand All @@ -21,22 +25,25 @@ import (
)

type sourceTransformAdapter interface {
getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error)
getTransformerEvent(authCtx *gwtypes.AuthRequestContext, eventRequest []byte) ([]byte, error)
getTransformerURL(sourceType string) (string, error)
getAdapterVersion() string
}

// ----- v1 adapter ---------

type v1Adapter struct{}

type V1TransformerEvent struct {
Event json.RawMessage `json:"event"`
Source backendconfig.SourceT `json:"source"`
EventRequest json.RawMessage `json:"event"`
Source backendconfig.SourceT `json:"source"`
}

func (v1 *v1Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) {
func (v1 *v1Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, eventRequest []byte) ([]byte, error) {
source := authCtx.Source

v1TransformerEvent := V1TransformerEvent{
Event: body,
EventRequest: eventRequest,
Source: backendconfig.SourceT{
ID: source.ID,
OriginalID: source.OriginalID,
Expand All @@ -57,16 +64,98 @@ func (v1 *v1Adapter) getTransformerURL(sourceType string) (string, error) {
return getTransformerURL(transformer.V1, sourceType)
}

func (v1 *v1Adapter) getAdapterVersion() string {
return transformer.V1
}

// ----- v2 adapter -----

type v2Adapter struct{}

type V2TransformerEvent struct {
EventRequest json.RawMessage `json:"request"`
Source backendconfig.SourceT `json:"source"`
}

func (v2 *v2Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, eventRequest []byte) ([]byte, error) {
source := authCtx.Source

v2TransformerEvent := V2TransformerEvent{
EventRequest: eventRequest,
Source: backendconfig.SourceT{
ID: source.ID,
OriginalID: source.OriginalID,
Name: source.Name,
SourceDefinition: source.SourceDefinition,
Config: source.Config,
Enabled: source.Enabled,
WorkspaceID: source.WorkspaceID,
WriteKey: source.WriteKey,
Transient: source.Transient,
},
}

return json.Marshal(v2TransformerEvent)
}

func (v2 *v2Adapter) getTransformerURL(sourceType string) (string, error) {
return getTransformerURL(transformer.V2, sourceType)
}

func (v2 *v2Adapter) getAdapterVersion() string {
return transformer.V2
}

// ------------------------------

func newSourceTransformAdapter(version string) sourceTransformAdapter {
// V0 Deprecation: this function returns v1 adapter by default, thereby deprecating v0
if version == transformer.V2 {
return &v2Adapter{}
}
return &v1Adapter{}
}

// --- utilities -----

func getTransformerURL(version, sourceType string) (string, error) {
baseURL := config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
return url.JoinPath(baseURL, version, "sources", strings.ToLower(sourceType))
}

func prepareTransformerEventRequestV1(req *http.Request, sourceType string, sourceListForParsingParams []string) ([]byte, error) {
defer func() {
if req.Body != nil {
_ = req.Body.Close()
}
}()

body, err := io.ReadAll(req.Body)
if err != nil {
return nil, errors.New(response.RequestBodyReadFailed)
}

if len(body) == 0 {
body = []byte("{}") // If body is empty, set it to an empty JSON object
}

if slices.Contains(sourceListForParsingParams, strings.ToLower(sourceType)) {
queryParams := req.URL.Query()
return sjson.SetBytes(body, "query_parameters", queryParams)
}

return body, nil
}

func prepareTransformerEventRequestV2(req *http.Request) ([]byte, error) {
requestJson, err := requesttojson.RequestToJSON(req, "{}")
if err != nil {
return nil, err
}

return json.Marshal(requestJson)
}

type outputToSource struct {
Body []byte `json:"body"`
ContentType string `json:"contentType"`
Expand Down
51 changes: 49 additions & 2 deletions gateway/webhook/webhookTransformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func TestV1Adapter(t *testing.T) {
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V1, testSrcTypeLower)))
})

t.Run("should return the right adapter version", func(t *testing.T) {
v1Adapter := newSourceTransformAdapter(transformer.V1)
adapterVersion := v1Adapter.getAdapterVersion()
require.Equal(t, adapterVersion, transformer.V1)
})

t.Run("should return the body in v1 format", func(t *testing.T) {
testSrcId := "testSrcId"
testBody := []byte(`{"a": "testBody"}`)
Expand All @@ -39,11 +45,52 @@ func TestV1Adapter(t *testing.T) {
require.Nil(t, err)

v1TransformerEvent := V1TransformerEvent{
Event: testBody,
Source: backendconfig.SourceT{ID: mockSrc.ID},
EventRequest: testBody,
Source: backendconfig.SourceT{ID: mockSrc.ID},
}
expectedBody, err := json.Marshal(v1TransformerEvent)
require.Nil(t, err)
require.Equal(t, expectedBody, retBody)
})
}

func TestV2Adapter(t *testing.T) {
t.Run("should return the right url", func(t *testing.T) {
v2Adapter := newSourceTransformAdapter(transformer.V2)
testSrcType := "testSrcType"
testSrcTypeLower := "testsrctype"

url, err := v2Adapter.getTransformerURL(testSrcType)
require.Nil(t, err)
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V2, testSrcTypeLower)))
})

t.Run("should return the right adapter version", func(t *testing.T) {
v1Adapter := newSourceTransformAdapter(transformer.V2)
adapterVersion := v1Adapter.getAdapterVersion()
require.Equal(t, adapterVersion, transformer.V2)
})

t.Run("should return the body in v2 format", func(t *testing.T) {
testSrcId := "testSrcId"
testBody := []byte(`{"a": "testBody"}`)

mockSrc := backendconfig.SourceT{
ID: testSrcId,
Destinations: []backendconfig.DestinationT{{ID: "testDestId"}},
}

v2Adapter := newSourceTransformAdapter(transformer.V2)

retBody, err := v2Adapter.getTransformerEvent(&gwtypes.AuthRequestContext{Source: mockSrc}, testBody)
require.Nil(t, err)

v2TransformerEvent := V2TransformerEvent{
EventRequest: testBody,
Source: backendconfig.SourceT{ID: mockSrc.ID},
}
expectedBody, err := json.Marshal(v2TransformerEvent)
require.Nil(t, err)
require.Equal(t, expectedBody, retBody)
})
}
Loading
Loading