From 61ec3365940e3b51ba3337a70f6037144d33db62 Mon Sep 17 00:00:00 2001 From: Leonidas Vrachnis Date: Thu, 2 Jan 2025 22:24:01 +0100 Subject: [PATCH] add test for all clients --- processor/transformer/transformer.go | 2 +- processor/transformer/transformer_test.go | 1214 +++++++++++---------- 2 files changed, 612 insertions(+), 604 deletions(-) diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index 136b5ba2ee..0549781ef6 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -219,7 +219,7 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency) - clientType := config.GetString("Transformer.Client.type", "stdlib") + clientType := conf.GetString("Transformer.Client.type", "stdlib") switch clientType { case "stdlib": diff --git a/processor/transformer/transformer_test.go b/processor/transformer/transformer_test.go index ea746dc44f..a920831a99 100644 --- a/processor/transformer/transformer_test.go +++ b/processor/transformer/transformer_test.go @@ -142,85 +142,151 @@ func (et *endpointTransformer) ServeHTTP(w http.ResponseWriter, r *http.Request) } func TestTransformer(t *testing.T) { - t.Run("success", func(t *testing.T) { - ft := &fakeTransformer{ - t: t, - } - - srv := httptest.NewServer(ft) - defer srv.Close() - - tc := []struct { - batchSize int - eventsCount int - failEvery int - }{ - {batchSize: 0, eventsCount: 0}, - {batchSize: 10, eventsCount: 100}, - {batchSize: 10, eventsCount: 9}, - {batchSize: 10, eventsCount: 91}, - {batchSize: 10, eventsCount: 99}, - {batchSize: 10, eventsCount: 1}, - {batchSize: 10, eventsCount: 80, failEvery: 4}, - {batchSize: 10, eventsCount: 80, failEvery: 1}, - } + clientTypes := []string{"stdlib", "recycled", "httplb"} + for _, clientType := range clientTypes { + t.Run(fmt.Sprintf("with %s client", clientType), func(t *testing.T) { + conf := config.New() + conf.Set("Transformer.Client.type", clientType) + + t.Run("success", func(t *testing.T) { + ft := &fakeTransformer{ + t: t, + } - for _, tt := range tc { - statsStore, err := memstats.New() - require.NoError(t, err) + srv := httptest.NewServer(ft) + defer srv.Close() - tr := handle{} - tr.stat = statsStore - tr.logger = logger.NOP - tr.conf = config.Default - tr.httpClient = srv.Client() - tr.guardConcurrency = make(chan struct{}, 200) - tr.sentStat = tr.stat.NewStat("transformer_sent", stats.CountType) - tr.receivedStat = tr.stat.NewStat("transformer_received", stats.CountType) - tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) - tr.config.timeoutDuration = 1 * time.Second - tr.config.failOnUserTransformTimeout = config.SingleValueLoader(true) - tr.config.failOnError = config.SingleValueLoader(true) - tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) - tr.config.maxRetry = config.SingleValueLoader(1) - - batchSize := tt.batchSize - eventsCount := tt.eventsCount - failEvery := tt.failEvery - - events := make([]TransformerEvent, eventsCount) - expectedResponse := Response{} - - transformationID := rand.String(10) - - destinationConfig := backendconfigtest.NewDestinationBuilder("WEBHOOK"). - WithUserTransformation(transformationID, rand.String(10)).Build() - - metadata := Metadata{ - DestinationType: destinationConfig.DestinationDefinition.Name, - SourceID: rand.String(10), - DestinationID: destinationConfig.ID, - TransformationID: destinationConfig.Transformations[0].ID, - } - - for i := range events { - msgID := fmt.Sprintf("messageID-%d", i) - statusCode := http.StatusOK - - if failEvery != 0 && i%failEvery == 0 { - statusCode = http.StatusBadRequest + tc := []struct { + batchSize int + eventsCount int + failEvery int + }{ + {batchSize: 0, eventsCount: 0}, + {batchSize: 10, eventsCount: 100}, + {batchSize: 10, eventsCount: 9}, + {batchSize: 10, eventsCount: 91}, + {batchSize: 10, eventsCount: 99}, + {batchSize: 10, eventsCount: 1}, + {batchSize: 10, eventsCount: 80, failEvery: 4}, + {batchSize: 10, eventsCount: 80, failEvery: 1}, } - metadata := metadata - metadata.MessageID = msgID + for _, tt := range tc { + statsStore, err := memstats.New() + require.NoError(t, err) + + tr := handle{} + tr.stat = statsStore + tr.logger = logger.NOP + tr.conf = conf + tr.httpClient = srv.Client() + tr.guardConcurrency = make(chan struct{}, 200) + tr.sentStat = tr.stat.NewStat("transformer_sent", stats.CountType) + tr.receivedStat = tr.stat.NewStat("transformer_received", stats.CountType) + tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) + tr.config.timeoutDuration = 1 * time.Second + tr.config.failOnUserTransformTimeout = config.SingleValueLoader(true) + tr.config.failOnError = config.SingleValueLoader(true) + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + tr.config.maxRetry = config.SingleValueLoader(1) + + batchSize := tt.batchSize + eventsCount := tt.eventsCount + failEvery := tt.failEvery + + events := make([]TransformerEvent, eventsCount) + expectedResponse := Response{} + + transformationID := rand.String(10) + + destinationConfig := backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithUserTransformation(transformationID, rand.String(10)).Build() + + metadata := Metadata{ + DestinationType: destinationConfig.DestinationDefinition.Name, + SourceID: rand.String(10), + DestinationID: destinationConfig.ID, + TransformationID: destinationConfig.Transformations[0].ID, + } + + for i := range events { + msgID := fmt.Sprintf("messageID-%d", i) + statusCode := http.StatusOK + + if failEvery != 0 && i%failEvery == 0 { + statusCode = http.StatusBadRequest + } + + metadata := metadata + metadata.MessageID = msgID - events[i] = TransformerEvent{ - Metadata: metadata, + events[i] = TransformerEvent{ + Metadata: metadata, + Message: map[string]interface{}{ + "src-key-1": msgID, + "forceStatusCode": statusCode, + }, + Destination: destinationConfig, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, + } + + tResp := TransformerResponse{ + Metadata: metadata, + StatusCode: statusCode, + Output: map[string]interface{}{ + "src-key-1": msgID, + "echo-key-1": msgID, + }, + } + + if statusCode < http.StatusBadRequest { + expectedResponse.Events = append(expectedResponse.Events, tResp) + } else { + tResp.Error = "error" + expectedResponse.FailedEvents = append(expectedResponse.FailedEvents, tResp) + } + } + + rsp := tr.transform(context.TODO(), events, srv.URL, batchSize, "test-stage") + require.Equal(t, expectedResponse, rsp) + + metrics := statsStore.GetByName("processor.transformer_request_time") + if tt.eventsCount > 0 { + require.NotEmpty(t, metrics) + for _, m := range metrics { + require.Equal(t, stats.Tags{ + "stage": "test-stage", + "sourceId": metadata.SourceID, + "destinationType": destinationConfig.DestinationDefinition.Name, + "destinationId": destinationConfig.ID, + "transformationId": destinationConfig.Transformations[0].ID, + + // Legacy tags: to be removed + "dest_type": destinationConfig.DestinationDefinition.Name, + "dest_id": destinationConfig.ID, + "src_id": metadata.SourceID, + }, m.Tags) + } + } + } + }) + + t.Run("timeout", func(t *testing.T) { + msgID := "messageID-0" + events := append([]TransformerEvent{}, TransformerEvent{ + Metadata: Metadata{ + MessageID: msgID, + }, Message: map[string]interface{}{ - "src-key-1": msgID, - "forceStatusCode": statusCode, + "src-key-1": msgID, }, - Destination: destinationConfig, Credentials: []Credential{ { ID: "test-credential", @@ -229,319 +295,128 @@ func TestTransformer(t *testing.T) { IsSecret: false, }, }, - } + }) - tResp := TransformerResponse{ - Metadata: metadata, - StatusCode: statusCode, - Output: map[string]interface{}{ - "src-key-1": msgID, - "echo-key-1": msgID, + testCases := []struct { + name string + retries int + expectedRetries int + expectPanic bool + stage string + expectedResponse []TransformerResponse + failOnUserTransformTimeout bool + }{ + { + name: "user transformation timeout", + retries: 3, + stage: userTransformerStage, + expectPanic: true, + failOnUserTransformTimeout: false, }, - } - - if statusCode < http.StatusBadRequest { - expectedResponse.Events = append(expectedResponse.Events, tResp) - } else { - tResp.Error = "error" - expectedResponse.FailedEvents = append(expectedResponse.FailedEvents, tResp) - } - } - - rsp := tr.transform(context.TODO(), events, srv.URL, batchSize, "test-stage") - require.Equal(t, expectedResponse, rsp) - - metrics := statsStore.GetByName("processor.transformer_request_time") - if tt.eventsCount > 0 { - require.NotEmpty(t, metrics) - for _, m := range metrics { - require.Equal(t, stats.Tags{ - "stage": "test-stage", - "sourceId": metadata.SourceID, - "destinationType": destinationConfig.DestinationDefinition.Name, - "destinationId": destinationConfig.ID, - "transformationId": destinationConfig.Transformations[0].ID, - - // Legacy tags: to be removed - "dest_type": destinationConfig.DestinationDefinition.Name, - "dest_id": destinationConfig.ID, - "src_id": metadata.SourceID, - }, m.Tags) - } - } - } - }) - - t.Run("timeout", func(t *testing.T) { - msgID := "messageID-0" - events := append([]TransformerEvent{}, TransformerEvent{ - Metadata: Metadata{ - MessageID: msgID, - }, - Message: map[string]interface{}{ - "src-key-1": msgID, - }, - Credentials: []Credential{ - { - ID: "test-credential", - Key: "test-key", - Value: "test-value", - IsSecret: false, - }, - }, - }) - - testCases := []struct { - name string - retries int - expectedRetries int - expectPanic bool - stage string - expectedResponse []TransformerResponse - failOnUserTransformTimeout bool - }{ - { - name: "user transformation timeout", - retries: 3, - stage: userTransformerStage, - expectPanic: true, - failOnUserTransformTimeout: false, - }, - { - name: "user transformation timeout with fail on timeout", - retries: 3, - stage: userTransformerStage, - expectPanic: false, - expectedResponse: []TransformerResponse{ { - Metadata: Metadata{ - MessageID: msgID, - }, - StatusCode: TransformerRequestTimeout, - Output: map[string]interface{}{ - "src-key-1": msgID, + name: "user transformation timeout with fail on timeout", + retries: 3, + stage: userTransformerStage, + expectPanic: false, + expectedResponse: []TransformerResponse{ + { + Metadata: Metadata{ + MessageID: msgID, + }, + StatusCode: TransformerRequestTimeout, + Output: map[string]interface{}{ + "src-key-1": msgID, + }, + }, }, + failOnUserTransformTimeout: true, }, - }, - failOnUserTransformTimeout: true, - }, - { - name: "destination transformation timeout", - retries: 3, - stage: destTransformerStage, - expectPanic: true, - failOnUserTransformTimeout: false, - }, - { - name: "destination transformation timeout with fail on timeout", - retries: 3, - stage: destTransformerStage, - expectPanic: true, - failOnUserTransformTimeout: true, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - ch := make(chan struct{}) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-ch - })) - defer srv.Close() - - client := srv.Client() - client.Timeout = 1 * time.Millisecond - - tr := handle{} - tr.config.timeoutDuration = 1 * time.Millisecond - tr.stat = stats.Default - tr.logger = logger.NOP - tr.conf = config.Default - tr.httpClient = client - tr.config.maxRetry = config.SingleValueLoader(tc.retries) - tr.config.failOnUserTransformTimeout = config.SingleValueLoader(tc.failOnUserTransformTimeout) - tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) - tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + { + name: "destination transformation timeout", + retries: 3, + stage: destTransformerStage, + expectPanic: true, + failOnUserTransformTimeout: false, + }, + { + name: "destination transformation timeout with fail on timeout", + retries: 3, + stage: destTransformerStage, + expectPanic: true, + failOnUserTransformTimeout: true, + }, + } - if tc.expectPanic { - require.Panics(t, func() { - _ = tr.request(context.TODO(), srv.URL, tc.stage, events) + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + ch := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-ch + })) + defer srv.Close() + + client := srv.Client() + client.Timeout = 1 * time.Millisecond + + tr := handle{} + tr.config.timeoutDuration = 1 * time.Millisecond + tr.stat = stats.Default + tr.logger = logger.NOP + tr.conf = conf + tr.httpClient = client + tr.config.maxRetry = config.SingleValueLoader(tc.retries) + tr.config.failOnUserTransformTimeout = config.SingleValueLoader(tc.failOnUserTransformTimeout) + tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + + if tc.expectPanic { + require.Panics(t, func() { + _ = tr.request(context.TODO(), srv.URL, tc.stage, events) + }) + close(ch) + return + } + + _, err := url.Parse(srv.URL) + require.NoError(t, err) + + rsp := tr.request(context.TODO(), srv.URL, tc.stage, events) + require.Len(t, rsp, 1) + require.Equal(t, rsp[0].StatusCode, TransformerRequestTimeout) + require.Equal(t, rsp[0].Metadata, Metadata{ + MessageID: msgID, + }) + require.Contains(t, rsp[0].Error, "transformer request timed out:") + close(ch) }) - close(ch) - return } - - _, err := url.Parse(srv.URL) - require.NoError(t, err) - - rsp := tr.request(context.TODO(), srv.URL, tc.stage, events) - require.Len(t, rsp, 1) - require.Equal(t, rsp[0].StatusCode, TransformerRequestTimeout) - require.Equal(t, rsp[0].Metadata, Metadata{ - MessageID: msgID, - }) - require.Contains(t, rsp[0].Error, "transformer request timed out:") - close(ch) }) - } - }) - - t.Run("endless retries in case of control plane down", func(t *testing.T) { - msgID := "messageID-0" - events := append([]TransformerEvent{}, TransformerEvent{ - Metadata: Metadata{ - MessageID: msgID, - }, - Message: map[string]interface{}{ - "src-key-1": msgID, - }, - Credentials: []Credential{ - { - ID: "test-credential", - Key: "test-key", - Value: "test-value", - IsSecret: false, - }, - }, - }) - - elt := &endlessLoopTransformer{ - maxRetryCount: 3, - statusCode: StatusCPDown, - statusError: "control plane not reachable", - apiVersion: types.SupportedTransformerApiVersion, - t: t, - } - - srv := httptest.NewServer(elt) - defer srv.Close() - - tr := handle{} - tr.stat = stats.Default - tr.logger = logger.NOP - tr.conf = config.Default - tr.httpClient = srv.Client() - tr.config.maxRetry = config.SingleValueLoader(1) - tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) - tr.config.timeoutDuration = 1 * time.Second - tr.config.failOnUserTransformTimeout = config.SingleValueLoader(false) - tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) - - rsp := tr.request(context.TODO(), srv.URL, "test-stage", events) - require.Equal(t, rsp, []TransformerResponse{ - { - Metadata: Metadata{ - MessageID: msgID, - }, - StatusCode: http.StatusOK, - Output: map[string]interface{}{ - "src-key-1": msgID, - }, - }, - }) - require.Equal(t, elt.retryCount, 3) - }) - t.Run("retries", func(t *testing.T) { - msgID := "messageID-0" - events := append([]TransformerEvent{}, TransformerEvent{ - Metadata: Metadata{ - MessageID: msgID, - }, - Message: map[string]interface{}{ - "src-key-1": msgID, - }, - Destination: backendconfig.DestinationT{ - Transformations: []backendconfig.TransformationT{ - { - ID: "test-transformation", - VersionID: "test-version", + t.Run("endless retries in case of control plane down", func(t *testing.T) { + msgID := "messageID-0" + events := append([]TransformerEvent{}, TransformerEvent{ + Metadata: Metadata{ + MessageID: msgID, }, - }, - }, - Credentials: []Credential{ - { - ID: "test-credential", - Key: "test-key", - Value: "test-value", - IsSecret: false, - }, - }, - }) - - testCases := []struct { - name string - retries int - maxRetryCount int - statusCode int - statusError string - expectedRetries int - expectPanic bool - expectedResponse []TransformerResponse - failOnError bool - }{ - { - name: "too many requests", - retries: 3, - maxRetryCount: 10, - statusCode: http.StatusTooManyRequests, - statusError: "too many requests", - expectedRetries: 4, - expectPanic: true, - failOnError: false, - }, - { - name: "too many requests with fail on error", - retries: 3, - maxRetryCount: 10, - statusCode: http.StatusTooManyRequests, - statusError: "too many requests", - expectedRetries: 4, - expectPanic: false, - expectedResponse: []TransformerResponse{ - { - Metadata: Metadata{ - MessageID: msgID, - }, - StatusCode: TransformerRequestFailure, - Error: "transformer request failed: transformer returned status code: 429", + Message: map[string]interface{}{ + "src-key-1": msgID, }, - }, - failOnError: true, - }, - { - name: "transient control plane error", - retries: 30, - maxRetryCount: 3, - statusCode: StatusCPDown, - statusError: "control plane not reachable", - expectedRetries: 3, - expectPanic: false, - expectedResponse: []TransformerResponse{ - { - Metadata: Metadata{ - MessageID: msgID, - }, - StatusCode: http.StatusOK, - Output: map[string]interface{}{ - "src-key-1": msgID, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, }, }, - }, - failOnError: false, - }, - } - - for _, tc := range testCases { - tc := tc + }) - t.Run(tc.name, func(t *testing.T) { elt := &endlessLoopTransformer{ - maxRetryCount: tc.maxRetryCount, - statusCode: tc.statusCode, - statusError: tc.statusError, + maxRetryCount: 3, + statusCode: StatusCPDown, + statusError: "control plane not reachable", apiVersion: types.SupportedTransformerApiVersion, t: t, } @@ -552,69 +427,16 @@ func TestTransformer(t *testing.T) { tr := handle{} tr.stat = stats.Default tr.logger = logger.NOP - tr.conf = config.Default + tr.conf = conf tr.httpClient = srv.Client() + tr.config.maxRetry = config.SingleValueLoader(1) + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + tr.config.timeoutDuration = 1 * time.Second tr.config.failOnUserTransformTimeout = config.SingleValueLoader(false) - tr.config.maxRetry = config.SingleValueLoader(tc.retries) - tr.config.failOnError = config.SingleValueLoader(tc.failOnError) tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) - tr.config.timeoutDuration = 1 * time.Second - tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) - - if tc.expectPanic { - require.Panics(t, func() { - _ = tr.request(context.TODO(), srv.URL, "test-stage", events) - }) - require.Equal(t, elt.retryCount, tc.expectedRetries) - return - } rsp := tr.request(context.TODO(), srv.URL, "test-stage", events) - require.Equal(t, tc.expectedResponse, rsp) - require.Equal(t, tc.expectedRetries, elt.retryCount) - }) - } - }) - - t.Run("version compatibility", func(t *testing.T) { - msgID := "messageID-0" - events := append([]TransformerEvent{}, TransformerEvent{ - Metadata: Metadata{ - MessageID: msgID, - }, - Message: map[string]interface{}{ - "src-key-1": msgID, - }, - Destination: backendconfig.DestinationT{ - Transformations: []backendconfig.TransformationT{ - { - ID: "test-transformation", - VersionID: "test-version", - }, - }, - }, - Credentials: []Credential{ - { - ID: "test-credential", - Key: "test-key", - Value: "test-value", - IsSecret: false, - }, - }, - }) - - testCases := []struct { - name string - apiVersion int - skipApiVersion bool - expectPanic bool - expectedResponse []TransformerResponse - }{ - { - name: "compatible api version", - apiVersion: types.SupportedTransformerApiVersion, - expectPanic: false, - expectedResponse: []TransformerResponse{ + require.Equal(t, rsp, []TransformerResponse{ { Metadata: Metadata{ MessageID: msgID, @@ -624,143 +446,287 @@ func TestTransformer(t *testing.T) { "src-key-1": msgID, }, }, - }, - }, - { - name: "incompatible api version", - apiVersion: 1, - expectPanic: true, - }, - { - name: "unexpected api version", - skipApiVersion: true, - expectPanic: true, - }, - } + }) + require.Equal(t, elt.retryCount, 3) + }) - for _, tc := range testCases { - tc := tc + t.Run("retries", func(t *testing.T) { + msgID := "messageID-0" + events := append([]TransformerEvent{}, TransformerEvent{ + Metadata: Metadata{ + MessageID: msgID, + }, + Message: map[string]interface{}{ + "src-key-1": msgID, + }, + Destination: backendconfig.DestinationT{ + Transformations: []backendconfig.TransformationT{ + { + ID: "test-transformation", + VersionID: "test-version", + }, + }, + }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, + }) - t.Run(tc.name, func(t *testing.T) { - elt := &endlessLoopTransformer{ - maxRetryCount: 0, - skipApiVersion: tc.skipApiVersion, - apiVersion: tc.apiVersion, - t: t, + testCases := []struct { + name string + retries int + maxRetryCount int + statusCode int + statusError string + expectedRetries int + expectPanic bool + expectedResponse []TransformerResponse + failOnError bool + }{ + { + name: "too many requests", + retries: 3, + maxRetryCount: 10, + statusCode: http.StatusTooManyRequests, + statusError: "too many requests", + expectedRetries: 4, + expectPanic: true, + failOnError: false, + }, + { + name: "too many requests with fail on error", + retries: 3, + maxRetryCount: 10, + statusCode: http.StatusTooManyRequests, + statusError: "too many requests", + expectedRetries: 4, + expectPanic: false, + expectedResponse: []TransformerResponse{ + { + Metadata: Metadata{ + MessageID: msgID, + }, + StatusCode: TransformerRequestFailure, + Error: "transformer request failed: transformer returned status code: 429", + }, + }, + failOnError: true, + }, + { + name: "transient control plane error", + retries: 30, + maxRetryCount: 3, + statusCode: StatusCPDown, + statusError: "control plane not reachable", + expectedRetries: 3, + expectPanic: false, + expectedResponse: []TransformerResponse{ + { + Metadata: Metadata{ + MessageID: msgID, + }, + StatusCode: http.StatusOK, + Output: map[string]interface{}{ + "src-key-1": msgID, + }, + }, + }, + failOnError: false, + }, } - srv := httptest.NewServer(elt) - defer srv.Close() - - tr := handle{} - tr.httpClient = srv.Client() - tr.stat = stats.Default - tr.conf = config.Default - tr.logger = logger.NOP - tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) - tr.config.maxRetry = config.SingleValueLoader(1) - tr.config.timeoutDuration = 1 * time.Second - tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) - - if tc.expectPanic { - require.Panics(t, func() { - _ = tr.request(context.TODO(), srv.URL, "test-stage", events) + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + elt := &endlessLoopTransformer{ + maxRetryCount: tc.maxRetryCount, + statusCode: tc.statusCode, + statusError: tc.statusError, + apiVersion: types.SupportedTransformerApiVersion, + t: t, + } + + srv := httptest.NewServer(elt) + defer srv.Close() + + tr := handle{} + tr.stat = stats.Default + tr.logger = logger.NOP + tr.conf = conf + tr.httpClient = srv.Client() + tr.config.failOnUserTransformTimeout = config.SingleValueLoader(false) + tr.config.maxRetry = config.SingleValueLoader(tc.retries) + tr.config.failOnError = config.SingleValueLoader(tc.failOnError) + tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) + tr.config.timeoutDuration = 1 * time.Second + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + + if tc.expectPanic { + require.Panics(t, func() { + _ = tr.request(context.TODO(), srv.URL, "test-stage", events) + }) + require.Equal(t, elt.retryCount, tc.expectedRetries) + return + } + + rsp := tr.request(context.TODO(), srv.URL, "test-stage", events) + require.Equal(t, tc.expectedResponse, rsp) + require.Equal(t, tc.expectedRetries, elt.retryCount) }) - return } - - rsp := tr.request(context.TODO(), srv.URL, "test-stage", events) - require.Equal(t, tc.expectedResponse, rsp) }) - } - }) - t.Run("endpoints", func(t *testing.T) { - msgID := "messageID-0" - expectedResponse := Response{ - Events: []TransformerResponse{ - { - Output: map[string]interface{}{ - "src-key-1": msgID, - }, + t.Run("version compatibility", func(t *testing.T) { + msgID := "messageID-0" + events := append([]TransformerEvent{}, TransformerEvent{ Metadata: Metadata{ MessageID: msgID, }, - StatusCode: http.StatusOK, - }, - }, - } - events := append([]TransformerEvent{}, TransformerEvent{ - Metadata: Metadata{ - MessageID: msgID, - }, - Message: map[string]interface{}{ - "src-key-1": msgID, - }, - Destination: backendconfig.DestinationT{ - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Name: "test-destination", - }, - Transformations: []backendconfig.TransformationT{ - { - ID: "test-transformation", - VersionID: "test-version", + Message: map[string]interface{}{ + "src-key-1": msgID, }, - }, - }, - Credentials: []Credential{ - { - ID: "test-credential", - Key: "test-key", - Value: "test-value", - IsSecret: false, - }, - }, - }) - - t.Run("Destination transformations", func(t *testing.T) { - et := &endpointTransformer{ - supportedPaths: []string{"/v0/destinations/test-destination"}, - t: t, - } + Destination: backendconfig.DestinationT{ + Transformations: []backendconfig.TransformationT{ + { + ID: "test-transformation", + VersionID: "test-version", + }, + }, + }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, + }) - srv := httptest.NewServer(et) - defer srv.Close() + testCases := []struct { + name string + apiVersion int + skipApiVersion bool + expectPanic bool + expectedResponse []TransformerResponse + }{ + { + name: "compatible api version", + apiVersion: types.SupportedTransformerApiVersion, + expectPanic: false, + expectedResponse: []TransformerResponse{ + { + Metadata: Metadata{ + MessageID: msgID, + }, + StatusCode: http.StatusOK, + Output: map[string]interface{}{ + "src-key-1": msgID, + }, + }, + }, + }, + { + name: "incompatible api version", + apiVersion: 1, + expectPanic: true, + }, + { + name: "unexpected api version", + skipApiVersion: true, + expectPanic: true, + }, + } - c := config.New() - c.Set("Processor.maxRetry", 1) - c.Set("DEST_TRANSFORM_URL", srv.URL) + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + elt := &endlessLoopTransformer{ + maxRetryCount: 0, + skipApiVersion: tc.skipApiVersion, + apiVersion: tc.apiVersion, + t: t, + } + + srv := httptest.NewServer(elt) + defer srv.Close() + + tr := handle{} + tr.httpClient = srv.Client() + tr.stat = stats.Default + tr.conf = conf + tr.logger = logger.NOP + tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) + tr.config.maxRetry = config.SingleValueLoader(1) + tr.config.timeoutDuration = 1 * time.Second + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + + if tc.expectPanic { + require.Panics(t, func() { + _ = tr.request(context.TODO(), srv.URL, "test-stage", events) + }) + return + } + + rsp := tr.request(context.TODO(), srv.URL, "test-stage", events) + require.Equal(t, tc.expectedResponse, rsp) + }) + } + }) - tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) - rsp := tr.Transform(context.TODO(), events, 10) - require.Equal(t, rsp, expectedResponse) - }) + t.Run("endpoints", func(t *testing.T) { + msgID := "messageID-0" + expectedResponse := Response{ + Events: []TransformerResponse{ + { + Output: map[string]interface{}{ + "src-key-1": msgID, + }, + Metadata: Metadata{ + MessageID: msgID, + }, + StatusCode: http.StatusOK, + }, + }, + } + events := append([]TransformerEvent{}, TransformerEvent{ + Metadata: Metadata{ + MessageID: msgID, + }, + Message: map[string]interface{}{ + "src-key-1": msgID, + }, + Destination: backendconfig.DestinationT{ + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destination", + }, + Transformations: []backendconfig.TransformationT{ + { + ID: "test-transformation", + VersionID: "test-version", + }, + }, + }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, + }) - t.Run("Destination warehouse transformations", func(t *testing.T) { - testCases := []struct { - name string - destinationType string - }{ - { - name: "rs", - destinationType: warehouseutils.RS, - }, - { - name: "clickhouse", - destinationType: warehouseutils.CLICKHOUSE, - }, - { - name: "snowflake", - destinationType: warehouseutils.SNOWFLAKE, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { + t.Run("Destination transformations", func(t *testing.T) { et := &endpointTransformer{ - supportedPaths: []string{`/v0/destinations/` + tc.name}, + supportedPaths: []string{"/v0/destinations/test-destination"}, t: t, } @@ -772,77 +738,119 @@ func TestTransformer(t *testing.T) { c.Set("DEST_TRANSFORM_URL", srv.URL) tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) + rsp := tr.Transform(context.TODO(), events, 10) + require.Equal(t, rsp, expectedResponse) + }) - events := append([]TransformerEvent{}, TransformerEvent{ - Metadata: Metadata{ - MessageID: msgID, - }, - Message: map[string]interface{}{ - "src-key-1": msgID, + t.Run("Destination warehouse transformations", func(t *testing.T) { + testCases := []struct { + name string + destinationType string + }{ + { + name: "rs", + destinationType: warehouseutils.RS, }, - Destination: backendconfig.DestinationT{ - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Name: tc.destinationType, - }, - Transformations: []backendconfig.TransformationT{ - { - ID: "test-transformation", - VersionID: "test-version", - }, - }, + { + name: "clickhouse", + destinationType: warehouseutils.CLICKHOUSE, }, - Credentials: []Credential{ - { - ID: "test-credential", - Key: "test-key", - Value: "test-value", - IsSecret: false, - }, + { + name: "snowflake", + destinationType: warehouseutils.SNOWFLAKE, }, - }) + } - rsp := tr.Transform(context.TODO(), events, 10) - require.Equal(t, rsp, expectedResponse) + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + et := &endpointTransformer{ + supportedPaths: []string{`/v0/destinations/` + tc.name}, + t: t, + } + + srv := httptest.NewServer(et) + defer srv.Close() + + c := config.New() + c.Set("Processor.maxRetry", 1) + c.Set("DEST_TRANSFORM_URL", srv.URL) + + tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) + + events := append([]TransformerEvent{}, TransformerEvent{ + Metadata: Metadata{ + MessageID: msgID, + }, + Message: map[string]interface{}{ + "src-key-1": msgID, + }, + Destination: backendconfig.DestinationT{ + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: tc.destinationType, + }, + Transformations: []backendconfig.TransformationT{ + { + ID: "test-transformation", + VersionID: "test-version", + }, + }, + }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, + }) + + rsp := tr.Transform(context.TODO(), events, 10) + require.Equal(t, rsp, expectedResponse) + }) + } }) - } - }) - t.Run("User transformations", func(t *testing.T) { - et := &endpointTransformer{ - supportedPaths: []string{"/customTransform"}, - t: t, - } + t.Run("User transformations", func(t *testing.T) { + et := &endpointTransformer{ + supportedPaths: []string{"/customTransform"}, + t: t, + } - srv := httptest.NewServer(et) - defer srv.Close() + srv := httptest.NewServer(et) + defer srv.Close() - c := config.New() - c.Set("Processor.maxRetry", 1) - c.Set("USER_TRANSFORM_URL", srv.URL) + c := config.New() + c.Set("Processor.maxRetry", 1) + c.Set("USER_TRANSFORM_URL", srv.URL) - tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) - rsp := tr.UserTransform(context.TODO(), events, 10) - require.Equal(t, rsp, expectedResponse) - }) + tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) + rsp := tr.UserTransform(context.TODO(), events, 10) + require.Equal(t, rsp, expectedResponse) + }) - t.Run("Tracking Plan Validations", func(t *testing.T) { - et := &endpointTransformer{ - supportedPaths: []string{"/v0/validate"}, - t: t, - } + t.Run("Tracking Plan Validations", func(t *testing.T) { + et := &endpointTransformer{ + supportedPaths: []string{"/v0/validate"}, + t: t, + } - srv := httptest.NewServer(et) - defer srv.Close() + srv := httptest.NewServer(et) + defer srv.Close() - c := config.New() - c.Set("Processor.maxRetry", 1) - c.Set("DEST_TRANSFORM_URL", srv.URL) + c := config.New() + c.Set("Processor.maxRetry", 1) + c.Set("DEST_TRANSFORM_URL", srv.URL) - tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) - rsp := tr.Validate(context.TODO(), events, 10) - require.Equal(t, rsp, expectedResponse) + tr := NewTransformer(c, logger.NOP, stats.Default, WithClient(srv.Client())) + rsp := tr.Validate(context.TODO(), events, 10) + require.Equal(t, rsp, expectedResponse) + }) + }) }) - }) + } } func TestLongRunningTransformation(t *testing.T) {