diff --git a/mocks/router/clevertap_segment/clevertap_segment_mock.go b/mocks/router/clevertap_segment/clevertap_segment_mock.go new file mode 100644 index 0000000000..ffc44d75f8 --- /dev/null +++ b/mocks/router/clevertap_segment/clevertap_segment_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment (interfaces: ClevertapService) +// +// Generated by this command: +// +// mockgen -destination=./clevertap_segment_mock.go -package=mocks github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment ClevertapService +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + clevertapSegment "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment" + gomock "go.uber.org/mock/gomock" +) + +// MockClevertapService is a mock of ClevertapService interface. +type MockClevertapService struct { + ctrl *gomock.Controller + recorder *MockClevertapServiceMockRecorder + isgomock struct{} +} + +// MockClevertapServiceMockRecorder is the mock recorder for MockClevertapService. +type MockClevertapServiceMockRecorder struct { + mock *MockClevertapService +} + +// NewMockClevertapService creates a new mock instance. +func NewMockClevertapService(ctrl *gomock.Controller) *MockClevertapService { + mock := &MockClevertapService{ctrl: ctrl} + mock.recorder = &MockClevertapServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClevertapService) EXPECT() *MockClevertapServiceMockRecorder { + return m.recorder +} + +// MakeHTTPRequest mocks base method. +func (m *MockClevertapService) MakeHTTPRequest(data *clevertapSegment.HttpRequestData) ([]byte, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MakeHTTPRequest", data) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// MakeHTTPRequest indicates an expected call of MakeHTTPRequest. +func (mr *MockClevertapServiceMockRecorder) MakeHTTPRequest(data any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeHTTPRequest", reflect.TypeOf((*MockClevertapService)(nil).MakeHTTPRequest), data) +} + +// UploadBulkFile mocks base method. +func (m *MockClevertapService) UploadBulkFile(filePath, presignedURL string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UploadBulkFile", filePath, presignedURL) + ret0, _ := ret[0].(error) + return ret0 +} + +// UploadBulkFile indicates an expected call of UploadBulkFile. +func (mr *MockClevertapServiceMockRecorder) UploadBulkFile(filePath, presignedURL any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadBulkFile", reflect.TypeOf((*MockClevertapService)(nil).UploadBulkFile), filePath, presignedURL) +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go new file mode 100644 index 0000000000..e0dd596355 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go @@ -0,0 +1,13 @@ +package clevertapSegment_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestClevertapSegment(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "clevertap_segment Suite") +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go new file mode 100644 index 0000000000..9bbe2a8c02 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go @@ -0,0 +1,363 @@ +package clevertapSegment_test + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "go.uber.org/mock/gomock" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/rudderlabs/rudder-go-kit/stats" + + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + + mocks "github.com/rudderlabs/rudder-server/mocks/router/clevertap_segment" + ClevertapSegment "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" + "github.com/rudderlabs/rudder-server/utils/misc" +) + +var ( + once sync.Once + destination = backendconfig.DestinationT{ + Name: "CLEVERTAP_SEGMENT", + Config: map[string]interface{}{ + "clevertapAccountId": "1234", + "clevertapAccountKey": "1234567", + }, + WorkspaceID: "workspace_id", + } + currentDir, _ = os.Getwd() +) + +func initClevertap() { + once.Do(func() { + logger.Reset() + misc.Init() + }) +} + +var _ = Describe("CLEVERTAP_SEGMENT test", func() { + Context("When uploading the file", func() { + BeforeEach(func() { + config.Reset() + config.Set("BatchRouter.CLEVERTAP_SEGMENT.MaxUploadLimit", 1*bytesize.MB) + + // Ensure the log directory exists + logDir := "/tmp/rudder-async-destination-logs" + err := os.MkdirAll(logDir, os.ModePerm) // Create the directory if it doesn't exist + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + config.Reset() + }) + + It("TestClevertapUploadWrongFilepath", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: "", + Destination: &destination, + } + + expected := common.AsyncUploadOutput{ + FailedReason: "got error while transforming the file. failed to open existing file", + ImportingJobIDs: nil, + FailedJobIDs: []int64{1, 2, 3, 4}, + ImportingCount: 0, + FailedCount: 4, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + + It("TestClevertapErrorWhileUploadingData", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} + + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) + + // Mock expectations + clevertapService.EXPECT(). + UploadBulkFile(gomock.Any(), "https://abc.com"). + Return(fmt.Errorf("Upload failed with status code: 400")). + Times(1) + + expected := common.AsyncUploadOutput{ + FailedReason: "error in uploading the bulk file: Upload failed with status code: 400", + ImportingJobIDs: nil, + FailedJobIDs: []int64{1, 2, 3, 4}, + ImportingCount: 0, + FailedCount: 4, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + + It("TestClevertapErrorWhilePreSignedURLFetch", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} + + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"error": "Invalid Credentials", "status": "fail", "code": 401}`), 401, nil). + Times(1) + + expected := common.AsyncUploadOutput{ + AbortReason: "Error while fetching presigned url Error while fetching preSignedUrl: Invalid Credentials", + ImportingJobIDs: nil, + AbortJobIDs: []int64{1, 2, 3, 4}, + ImportingCount: 0, + AbortCount: 4, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + + It("TestSuccessfulClevertapUpload", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) + + // Mock expectations for UploadBulkFile + clevertapService.EXPECT(). + UploadBulkFile(gomock.Any(), "https://abc.com"). + Return(nil). + Times(1) + + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"Segment ID": 1234, "status": "success", "code": 200}`), 200, nil). + Times(1) + + expected := common.AsyncUploadOutput{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + ImportingCount: 4, + FailedCount: 0, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + + It("TestFailureClevertapUploadWhileNaming", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + clevertapBulkUploader := ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, + "CLEVERTAP_SEGMENT", + destination.Config["clevertapAccountKey"].(string), + destination.Config["clevertapAccountId"].(string), + endpoints, + clevertapService, + clevertapServiceImpl.ConnectionConfig, + ) + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: clevertapBulkUploader} + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) + + // Mock expectations for UploadBulkFile + clevertapService.EXPECT(). + UploadBulkFile(gomock.Any(), "https://abc.com"). + Return(nil). + Times(1) + + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"error": "Email id is either not in the right format or does not belong to a valid admin", "status": "fail", "code": 401}`), 401, nil). + Times(1) + + expected := common.AsyncUploadOutput{ + AbortJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: nil, + AbortCount: 4, + FailedCount: 0, + AbortReason: "Error while creating the segment Error while namimng segment: Email id is either not in the right format or does not belong to a valid admin", + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + }) +}) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go new file mode 100644 index 0000000000..dd2cf23ab1 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go @@ -0,0 +1,74 @@ +package clevertapSegment + +import ( + "fmt" + + jsoniter "github.com/json-iterator/go" + + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" +) + +var jsonFast = jsoniter.ConfigCompatibleWithStandardLibrary + +func NewClevertapBulkUploader( + logger logger.Logger, + statsFactory stats.Stats, + destinationName, + accessToken, + appKey string, + clevertapEndpoints *Endpoints, + clevertap ClevertapService, + connectionConfig *ConnectionConfig, +) common.AsyncUploadAndTransformManager { + return &ClevertapBulkUploader{ + destName: destinationName, + logger: logger.Child("Clevertap").Child("ClevertapBulkUploader"), + statsFactory: statsFactory, + accessToken: accessToken, + appKey: appKey, + presignedURLEndpoint: clevertapEndpoints.BulkApi, + notifyEndpoint: clevertapEndpoints.NotifyApi, + fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destinationName, 5*bytesize.GB), + jobToCSVMap: map[int64]int64{}, + service: clevertap, + clevertapConnectionConfig: connectionConfig, + } +} + +func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT, connection *backendconfig.Connection) (common.AsyncDestinationManager, error) { + var destConfig DestinationConfig + destConfig, err := convert[map[string]interface{}, DestinationConfig](destination.Config) + if err != nil { + return nil, fmt.Errorf("error in converting destination config: %v", err) + } + + clevertapConnectionConfig, err := convertToConnectionConfig(connection) + if err != nil { + return nil, fmt.Errorf("error converting to connection config for clevertap segment: %v", err) + } + destName := destination.DestinationDefinition.Name + + clevertapService := &ClevertapServiceImpl{} + endpoints, err := getBulkApi(destConfig) + if err != nil { + return nil, fmt.Errorf("error getting bulk api for clevertap segment: %v", err) + } + + return common.SimpleAsyncDestinationManager{ + UploaderAndTransformer: NewClevertapBulkUploader( + logger, + statsFactory, + destName, + destConfig.AccessToken, + destConfig.AppKey, + endpoints, + clevertapService, + &clevertapConnectionConfig, + ), + }, nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go new file mode 100644 index 0000000000..4ed1354c2f --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go @@ -0,0 +1,73 @@ +package clevertapSegment + +import ( + "fmt" + "io" + "net/http" + "os" +) + +type ClevertapServiceImpl struct { + BulkApi string + NotifyApi string + ConnectionConfig *ConnectionConfig +} + +func (u *ClevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) { + req, err := http.NewRequest(data.Method, data.Endpoint, data.Body) + if err != nil { + return nil, 500, err + } + req.Header.Add("X-CleverTap-Account-Id", data.appKey) + req.Header.Add("X-CleverTap-Passcode", data.accessToken) + req.Header.Add("content-type", data.ContentType) + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + return nil, 500, err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, 500, err + } + return body, res.StatusCode, err +} + +func (u *ClevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) error { + // Open the file + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Get the file information + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat file: %w", err) + } + fileSize := fileInfo.Size() + + // Create the PUT request + req, err := http.NewRequest("PUT", presignedURL, file) + if err != nil { + return fmt.Errorf("failed to create PUT request: %w", err) + } + req.ContentLength = fileSize + // Execute the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to upload file: %w", err) + } + defer resp.Body.Close() + + // Check the response status + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("upload failed, status: %s, response: %s", resp.Status, string(body)) + } + return nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt b/router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt new file mode 100644 index 0000000000..a1b57c46f2 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt @@ -0,0 +1,5 @@ +{"message":{"action":"insert","fields":{"i":"90478tyr3fgiwhbjwu98y3ruwhe","g":"12345"}},"metadata":{"job_id":1}} +{"message":{"action":"update","fields":{"i": "abc@xyz.com"}},"metadata":{"job_id":2}} +{"message":{"action":"update","fields":{"g": "6789"}},"metadata":{"job_id":3}} +{"message":{"action":"update","fields":{"g": "6543"}},"metadata":{"job_id":4}} + diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go new file mode 100644 index 0000000000..174fca62da --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go @@ -0,0 +1,103 @@ +package clevertapSegment + +import ( + "encoding/csv" + "encoding/json" + "io" + "net/http" + "os" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" +) + +type DestinationConfig struct { + AppKey string `json:"appKey"` + AccessToken string `json:"accessToken"` + Region string `json:"region"` + OneTrustCookieCategories []OneTrustCookieCategory `json:"oneTrustCookieCategories"` +} + +type OneTrustCookieCategory struct { + OneTrustCookieCategory string `json:"oneTrustCookieCategory"` +} + +type HttpRequestData struct { + Body io.Reader + appKey string + accessToken string + Endpoint string + ContentType string + Method string +} + +type ActionFileInfo struct { + Action string + CSVWriter *csv.Writer + CSVFilePath string + SuccessfulJobIDs []int64 + FailedJobIDs []int64 + FileSize int64 + EventCount int64 + File *os.File +} + +type Message struct { + Action string `json:"action"` + Type string `json:"type"` + Channel string `json:"channel"` + Fields json.RawMessage `json:"fields"` + RecordId string `json:"recordId"` + Context json.RawMessage `json:"context"` +} + +type Metadata struct { + JobID int64 `json:"job_id"` +} + +type Data struct { + Message Message `json:"message"` + Metadata Metadata `json:"metadata"` +} + +const DEFAULT_SENDER_NAME = "Rudderstack" + +type Destination struct { + SchemaVersion string `json:"schemaVersion"` + SegmentName string `json:"segmentName"` + AdminEmail string `json:"adminEmail"` + SenderName string `json:"senderName"` +} + +type Endpoints struct { + BulkApi string + NotifyApi string +} + +type ConnConfig struct { + Destination Destination `json:"destination"` +} + +type ConnectionConfig struct { + SourceId string `json:"sourceId"` + DestinationId string `json:"destinationId"` + Enabled bool `json:"enabled"` + Config ConnConfig `json:"config"` +} + +type Uploader interface { + Upload(*common.AsyncDestinationStruct) common.AsyncUploadOutput + PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error + convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) + getPresignedS3URL(string, string, ClevertapService) (string, error) + namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error +} + +type HttpClient interface { + Do(req *http.Request) (*http.Response, error) +} + +type ClevertapService interface { + UploadBulkFile(filePath, presignedURL string) error + MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go new file mode 100644 index 0000000000..2af23b80c5 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go @@ -0,0 +1,367 @@ +package clevertapSegment + +import ( + "bufio" + "bytes" + "fmt" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/google/uuid" + + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" + "github.com/rudderlabs/rudder-server/utils/misc" +) + +type ClevertapBulkUploader struct { + destName string + logger logger.Logger + statsFactory stats.Stats + appKey string + accessToken string + presignedURLEndpoint string + notifyEndpoint string + fileSizeLimit int64 + jobToCSVMap map[int64]int64 + service ClevertapService + clevertapConnectionConfig *ConnectionConfig +} + +func (*ClevertapBulkUploader) Transform(job *jobsdb.JobT) (string, error) { + return common.GetMarshalledData(string(job.EventPayload), job.JobID) +} + +func (u *ClevertapBulkUploader) populateCsvFile(actionFile *ActionFileInfo, line string, data Data) error { + newFileSize := actionFile.FileSize + int64(len(line)) + if newFileSize < u.fileSizeLimit { + actionFile.FileSize = newFileSize + actionFile.EventCount += 1 + + // Unmarshal Properties into a map of jsonLib.RawMessage + var fields map[string]interface{} + if err := jsonFast.Unmarshal(data.Message.Fields, &fields); err != nil { + return err + } + + // Check for presence of "i" and "g" values + if valueG, okG := fields["g"]; okG { + // If "g" exists, prioritize it and omit "i" + csvRow := []string{"g", fmt.Sprintf("%v", valueG)} // Type: g + if err := actionFile.CSVWriter.Write(csvRow); err != nil { + return err + } + } else if valueI, okI := fields["i"]; okI { + // Write "i" value only if "g" does not exist + csvRow := []string{"i", fmt.Sprintf("%v", valueI)} // Type: i + if err := actionFile.CSVWriter.Write(csvRow); err != nil { + return err + } + } + + // Write the CSV header only once + if actionFile.EventCount == 1 { + // Fixed headers + headers := []string{"Type", "Identity"} + if err := actionFile.CSVWriter.Write(headers); err != nil { + return err + } + } + actionFile.CSVWriter.Flush() + actionFile.SuccessfulJobIDs = append(actionFile.SuccessfulJobIDs, data.Metadata.JobID) + } else { + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + return nil +} + +func (u *ClevertapBulkUploader) createCSVFile(inputDataFilePath string) (*ActionFileInfo, error) { + // Create a temporary directory using misc.CreateTMPDIR + tmpDirPath, err := misc.CreateTMPDIR() + if err != nil { + return nil, fmt.Errorf("failed to create temporary directory: %v", err) + } + + // Define a local directory name within the temp directory + localTmpDirName := fmt.Sprintf("/%s/", misc.RudderAsyncDestinationLogs) + + // Combine the temporary directory with the local directory name and generate a unique file path + path := filepath.Join(tmpDirPath, localTmpDirName, uuid.NewString()) + csvFilePath := fmt.Sprintf("%v.csv", path) + + // Initialize the CSV writer with the generated file path + actionFile, err := createCSVWriter(csvFilePath) + if err != nil { + return nil, err + } + defer actionFile.File.Close() // Ensure the file is closed when done + + // // Store the CSV file path in the ActionFileInfo struct + // actionFile.CSVFilePath = csvFilePath + + // Create a scanner to read the existing file line by line + inputFile, err := os.Open(inputDataFilePath) + if err != nil { + return nil, fmt.Errorf("failed to open existing file") + } + defer inputFile.Close() + + scanner := bufio.NewScanner(inputFile) + scanner.Buffer(nil, 50000*1024) // maximum size of a single line that the scanner can read. Adjust the buffer size if necessary + + for scanner.Scan() { + line := scanner.Text() + var data Data + if err := jsonFast.Unmarshal([]byte(line), &data); err != nil { + // Collect the failed job ID only if it's valid + if data.Metadata.JobID != 0 { // Check if JobID is not zero + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + continue + } + + // Calculate the payload size and observe it + payloadSizeStat := u.statsFactory.NewTaggedStat("payload_size", stats.HistogramType, + map[string]string{ + "module": "batch_router", + "destType": u.destName, + }) + payloadSizeStat.Observe(float64(len(data.Message.Fields))) + + // Populate the CSV file and collect success/failure job IDs + err := u.populateCsvFile(actionFile, line, data) + if err != nil { + // Collect the failed job ID only if it's valid + if data.Metadata.JobID != 0 { // Check if JobID is not zero + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error while scanning file: %v", err) + } + + // After processing, calculate the final file size + fileInfo, err := os.Stat(actionFile.CSVFilePath) + if err != nil { + return nil, fmt.Errorf("failed to retrieve file info: %v", err) + } + actionFile.FileSize = fileInfo.Size() + + return actionFile, nil +} + +/* +1. Read CSV file from the path +2. API call to get the presigned URL +3. API call to upload the file to the presigned URL +4. Check if the file is uploaded successfully +5. API call to name the segment +6. Remove the file after uploading +7. Return the success and failed job IDs +*/ +func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { + destination := asyncDestStruct.Destination + filePath := asyncDestStruct.FileName + var failedJobs []int64 + var successJobs []int64 + + actionFiles, err := u.createCSVFile(filePath) + if err != nil { + return common.AsyncUploadOutput{ + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...), + FailedReason: fmt.Sprintf("got error while transforming the file. %v", err.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs), + DestinationID: destination.ID, + } + } + eventsOverLimit := u.statsFactory.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + "destinationID": destination.ID, + }) + + eventsOverLimit.Count(len(actionFiles.FailedJobIDs)) + + uploadTimeStat := u.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + "destinationID": destination.ID, + }) + + presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken) // API + + if urlErr != nil { + eventsAbortedStat := u.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + "destinationID": destination.ID, + }) + eventsAbortedStat.Count(len(asyncDestStruct.ImportingJobIDs)) + return common.AsyncUploadOutput{ + AbortCount: len(asyncDestStruct.ImportingJobIDs), + DestinationID: asyncDestStruct.Destination.ID, + AbortJobIDs: asyncDestStruct.ImportingJobIDs, + AbortReason: fmt.Sprintf("%s %v", "Error while fetching presigned url", urlErr), + } + } + + startTime := time.Now() + errorDuringUpload := u.service.UploadBulkFile(actionFiles.CSVFilePath, presignedURL) // API + uploadTimeStat.Since(startTime) + + if errorDuringUpload != nil { + failedJobs = append(failedJobs, actionFiles.SuccessfulJobIDs...) + + // Append only failed job IDs if they exist + if len(actionFiles.FailedJobIDs) > 0 { + fmt.Println("Here") + failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) + } + // remove the file that could not be uploaded + err = os.Remove(actionFiles.CSVFilePath) + if err != nil { + return common.AsyncUploadOutput{ + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...), + FailedReason: fmt.Sprintf("Error in removing zip file: %v", err.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs), + DestinationID: destination.ID, + } + } else { + return common.AsyncUploadOutput{ + FailedJobIDs: failedJobs, + FailedReason: fmt.Sprintf("error in uploading the bulk file: %v", errorDuringUpload.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs), + DestinationID: destination.ID, + } + } + } + + failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) + successJobs = append(successJobs, actionFiles.SuccessfulJobIDs...) + + errorDuringNaming := u.namingSegment(presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) // API + + if errorDuringNaming != nil { + return common.AsyncUploadOutput{ + AbortCount: len(asyncDestStruct.ImportingJobIDs), + AbortJobIDs: asyncDestStruct.ImportingJobIDs, + AbortReason: fmt.Sprintf("%s %v", "Error while creating the segment", errorDuringNaming), + DestinationID: destination.ID, + } + } + + err = os.Remove(actionFiles.CSVFilePath) + if err != nil { + u.logger.Error("Error in removing zip file: %v", err) + } + + return common.AsyncUploadOutput{ + ImportingJobIDs: successJobs, + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, failedJobs...), + ImportingCount: len(successJobs), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(failedJobs), + DestinationID: destination.ID, + } +} + +func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string) (string, error) { + data := &HttpRequestData{ + Method: http.MethodPost, + Endpoint: u.presignedURLEndpoint, + ContentType: "application/json", + appKey: appKey, + accessToken: accessToken, + } + + body, statusCode, err := u.service.MakeHTTPRequest(data) + if err != nil { + return "", err + } + + // Parse the response + var result struct { + PresignedS3URL string `json:"presignedS3URL"` + Expiry string `json:"expiry"` + Status string `json:"status"` + Error string `json:"error"` + Code int `json:"code"` + } + if err := jsonFast.Unmarshal(body, &result); err != nil { + return "", err + } + + if statusCode != 200 { + err := fmt.Errorf("Error while fetching preSignedUrl: %s", result.Error) + return "", err + } + + if result.PresignedS3URL == "" { + err := fmt.Errorf("presigned URL is empty after parsing") + return "", err + } + + return result.PresignedS3URL, nil +} + +func (u *ClevertapBulkUploader) namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error { + url := u.notifyEndpoint + + // Construct the request payload + payload := map[string]interface{}{ + "name": u.clevertapConnectionConfig.Config.Destination.SegmentName, + "email": u.clevertapConnectionConfig.Config.Destination.AdminEmail, + "filename": csvFilePath, + "creator": u.clevertapConnectionConfig.Config.Destination.SenderName, + "url": presignedURL, + "replace": true, + } + + payloadBytes, err := jsonFast.Marshal(payload) + if err != nil { + return err + } + + // Create HttpRequestData + data := &HttpRequestData{ + Method: http.MethodPost, + Endpoint: url, + Body: bytes.NewBuffer(payloadBytes), + ContentType: "application/json", + appKey: appKey, + accessToken: accessToken, + } + + // Use MakeHTTPRequest to send the request + body, statusCode, err := u.service.MakeHTTPRequest(data) + if err != nil { + return err + } + // Parse the response + var result struct { + SegmentID int `json:"Segment ID"` + Status string `json:"status"` + Error string `json:"error"` + Code int `json:"code"` + } + if err := jsonFast.Unmarshal(body, &result); err != nil { + return err + } + + if statusCode != 200 { + err := fmt.Errorf("Error while namimng segment: %s", result.Error) + return err + } + + if result.SegmentID == 0 { + err := fmt.Errorf("Segment Creation is Unsuccessful") + return err + } + + return nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go new file mode 100644 index 0000000000..9a74ec70fe --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go @@ -0,0 +1,97 @@ +package clevertapSegment + +import ( + "encoding/csv" + "fmt" + "os" + "strings" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" +) + +func createCSVWriter(fileName string) (*ActionFileInfo, error) { + // Open or create the file where the CSV will be written + file, err := os.Create(fileName) + if err != nil { + return nil, fmt.Errorf("failed to create file: %v", err) + } + + // Create a new CSV writer using the file + csvWriter := csv.NewWriter(file) + + // Return the ActionFileInfo struct with the CSV writer, file, and file path + return &ActionFileInfo{ + CSVWriter: csvWriter, + File: file, + CSVFilePath: fileName, + }, nil +} + +func getBulkApi(destConfig DestinationConfig) (*Endpoints, error) { + endpoint, err := getCleverTapEndpoint(destConfig.Region) + if err != nil { + return nil, err + } + return &Endpoints{ + BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), + NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), + }, nil +} + +// getCleverTapEndpoint returns the API endpoint for the given region +func getCleverTapEndpoint(region string) (string, error) { + // Mapping of regions to endpoints + endpoints := map[string]string{ + "IN": "in1.api.clevertap.com", + "SINGAPORE": "sg1.api.clevertap.com", + "US": "us1.api.clevertap.com", + "INDONESIA": "aps3.api.clevertap.com", + "UAE": "mec1.api.clevertap.com", + "EU": "api.clevertap.com", + } + + // Normalize the region input to uppercase for case-insensitivity + region = strings.ToUpper(region) + + // Check if the region exists in the map + if endpoint, exists := endpoints[region]; exists { + return endpoint, nil + } + + // Return an error if the region is not recognized + return "", fmt.Errorf("unknown region: %s", region) +} + +type ConfigOutput interface { + ConnectionConfig | DestinationConfig +} + +// Generic function to convert input to output using JSON marshal/unmarshal +func convert[T any, U ConfigOutput](input T) (U, error) { + var output U + + // Marshal the input to JSON + data, err := jsonFast.Marshal(input) + if err != nil { + return output, fmt.Errorf("failed to marshal input: %w", err) + } + + // Unmarshal the JSON into the output type + err = jsonFast.Unmarshal(data, &output) + if err != nil { + return output, fmt.Errorf("failed to unmarshal to output type: %w", err) + } + + return output, nil +} + +func convertToConnectionConfig(conn *backendconfig.Connection) (ConnectionConfig, error) { + connConfig, err := convert[map[string]interface{}, ConnectionConfig](conn.Config) + if err != nil { + return ConnectionConfig{}, fmt.Errorf("failed to convert to connection config: %w", err) + } + if connConfig.Config.Destination.SenderName == "" { + connConfig.Config.Destination.SenderName = DEFAULT_SENDER_NAME + } + return connConfig, nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go new file mode 100644 index 0000000000..534681acfb --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go @@ -0,0 +1,110 @@ +package clevertapSegment + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" +) + +func TestGetCleverTapEndpoint(t *testing.T) { + tests := []struct { + region string + expectErr bool + expectURL string + }{ + {region: "IN", expectURL: "in1.api.clevertap.com"}, + {region: "SINGAPORE", expectURL: "sg1.api.clevertap.com"}, + {region: "US", expectURL: "us1.api.clevertap.com"}, + {region: "UNKNOWN", expectErr: true, expectURL: ""}, + } + + for _, tc := range tests { + t.Run(tc.region, func(t *testing.T) { + endpoint, err := getCleverTapEndpoint(tc.region) + + if tc.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, endpoint, tc.expectURL) + }) + } +} + +func TestGetBulkApi(t *testing.T) { + destConfig := DestinationConfig{Region: "IN"} + assert.Equal(t, "IN", destConfig.Region) + + endpoints, err := getBulkApi(destConfig) + assert.Nil(t, err) + assert.NotNil(t, endpoints) + assert.NotEmpty(t, endpoints.BulkApi) + assert.NotEmpty(t, endpoints.NotifyApi) + assert.Equal(t, "https://in1.api.clevertap.com/get_custom_list_segment_url", endpoints.BulkApi) + assert.Equal(t, "https://in1.api.clevertap.com/upload_custom_list_segment_completed", endpoints.NotifyApi) +} + +func TestConvertToConnectionConfig(t *testing.T) { + type expected struct { + senderName string + isErr bool + } + tests := []struct { + conn *backendconfig.Connection + expected expected + }{ + { + conn: &backendconfig.Connection{ + SourceID: "source123", + DestinationID: "destination456", + Enabled: true, + Config: map[string]interface{}{ + "invalidKey": make(chan int), // Channels cannot be marshaled to JSON + }, + }, + expected: expected{ + isErr: true, + }, + }, + { + conn: &backendconfig.Connection{ + SourceID: "source123", + DestinationID: "destination456", + Enabled: true, + Config: map[string]interface{}{ + "Destination": map[string]interface{}{ + "SchemaVersion": "v1.0", + "SegmentName": "User Segment A", + "AdminEmail": "admin@example.com", + "SenderName": "Rudderstack", + }, + }, + }, + expected: expected{ + senderName: "Rudderstack", + }, + }, + } + + for _, tc := range tests { + t.Run("", func(t *testing.T) { + connConfig, err := convertToConnectionConfig(tc.conn) + + if tc.expected.isErr { + assert.Error(t, err) + assert.Equal(t, "", connConfig.Config.Destination.SenderName) + return + } + + assert.NoError(t, err) + if connConfig.Config.Destination.SenderName == "" { + assert.Equal(t, DEFAULT_SENDER_NAME, connConfig.Config.Destination.SenderName) + } else { + assert.Equal(t, tc.expected.senderName, connConfig.Config.Destination.SenderName) + } + }) + } +} diff --git a/router/batchrouter/asyncdestinationmanager/common/common.go b/router/batchrouter/asyncdestinationmanager/common/common.go index 7ae3df5211..b87d40cd0d 100644 --- a/router/batchrouter/asyncdestinationmanager/common/common.go +++ b/router/batchrouter/asyncdestinationmanager/common/common.go @@ -114,6 +114,7 @@ type AsyncDestinationStruct struct { UploadMutex sync.RWMutex DestinationUploadURL string Destination *backendconfig.DestinationT + Connection *backendconfig.Connection Manager AsyncDestinationManager AttemptNums map[int64]int FirstAttemptedAts map[int64]time.Time diff --git a/router/batchrouter/asyncdestinationmanager/common/utils.go b/router/batchrouter/asyncdestinationmanager/common/utils.go index df711f816e..fbe1fd3bd9 100644 --- a/router/batchrouter/asyncdestinationmanager/common/utils.go +++ b/router/batchrouter/asyncdestinationmanager/common/utils.go @@ -3,7 +3,7 @@ package common import "slices" var ( - asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD", "SNOWPIPE_STREAMING"} + asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD", "SNOWPIPE_STREAMING", "CLEVERTAP_SEGMENT"} sftpDestinations = []string{"SFTP"} ) diff --git a/router/batchrouter/asyncdestinationmanager/manager.go b/router/batchrouter/asyncdestinationmanager/manager.go index 76d8e83af1..5f12055625 100644 --- a/router/batchrouter/asyncdestinationmanager/manager.go +++ b/router/batchrouter/asyncdestinationmanager/manager.go @@ -10,6 +10,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" bingadsaudience "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/bing-ads/audience" bingadsofflineconversions "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions" + clevertapSegment "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/eloqua" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload" @@ -26,6 +27,7 @@ func newRegularManager( statsFactory stats.Stats, destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig, + connection *backendconfig.Connection, ) (common.AsyncDestinationManager, error) { switch destination.DestinationDefinition.Name { case "BINGADS_AUDIENCE": @@ -42,6 +44,8 @@ func newRegularManager( return klaviyobulkupload.NewManager(logger, statsFactory, destination) case "LYTICS_BULK_UPLOAD": return lyticsBulkUpload.NewManager(logger, statsFactory, destination) + case "CLEVERTAP_SEGMENT": + return clevertapSegment.NewManager(logger, statsFactory, destination, connection) case "SNOWPIPE_STREAMING": return snowpipestreaming.New(conf, logger, statsFactory, destination), nil } @@ -62,10 +66,11 @@ func NewManager( statsFactory stats.Stats, destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig, + connection *backendconfig.Connection, ) (common.AsyncDestinationManager, error) { switch { case common.IsAsyncRegularDestination(destination.DestinationDefinition.Name): - return newRegularManager(conf, logger, statsFactory, destination, backendConfig) + return newRegularManager(conf, logger, statsFactory, destination, backendConfig, connection) case common.IsSFTPDestination(destination.DestinationDefinition.Name): return newSFTPManager(logger, statsFactory, destination) } diff --git a/router/batchrouter/handle_lifecycle.go b/router/batchrouter/handle_lifecycle.go index 4d1698fd56..ebd0ed5dbb 100644 --- a/router/batchrouter/handle_lifecycle.go +++ b/router/batchrouter/handle_lifecycle.go @@ -261,9 +261,9 @@ func (brt *Handle) Shutdown() { _ = brt.backgroundWait() } -func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT) { +func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT, connection *backendconfig.Connection) { _, ok := brt.asyncDestinationStruct[destination.ID] - manager, err := asyncdestinationmanager.NewManager(brt.conf, brt.logger.Child("asyncdestinationmanager"), stats.Default, destination, brt.backendConfig) + manager, err := asyncdestinationmanager.NewManager(brt.conf, brt.logger.Child("asyncdestinationmanager"), stats.Default, destination, brt.backendConfig, connection) if err != nil { brt.logger.Errorf("BRT: Error initializing async destination struct for %s destination: %v", destination.Name, err) destInitFailStat := stats.Default.NewTaggedStat("destination_initialization_fail", stats.CountType, map[string]string{ @@ -277,17 +277,18 @@ func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.Destina brt.asyncDestinationStruct[destination.ID] = &asynccommon.AsyncDestinationStruct{} } brt.asyncDestinationStruct[destination.ID].Destination = destination + brt.asyncDestinationStruct[destination.ID].Connection = connection brt.asyncDestinationStruct[destination.ID].Manager = manager } -func (brt *Handle) refreshDestination(destination backendconfig.DestinationT) { +func (brt *Handle) refreshDestination(destination backendconfig.DestinationT, connection backendconfig.Connection) { if asynccommon.IsAsyncDestination(destination.DestinationDefinition.Name) { asyncDestStruct, ok := brt.asyncDestinationStruct[destination.ID] if ok && asyncDestStruct.Destination != nil && asyncDestStruct.Destination.RevisionID == destination.RevisionID { return } - brt.initAsyncDestinationStruct(&destination) + brt.initAsyncDestinationStruct(&destination, &connection) } } @@ -395,7 +396,7 @@ func (brt *Handle) backendConfigSubscriber() { uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config) } destinationsMap[destination.ID].Sources = append(destinationsMap[destination.ID].Sources, source) - brt.refreshDestination(destination) + brt.refreshDestination(destination, wConfig.Connections[destination.ID]) // initialize map to track encountered anonymousIds for a warehouse destination if warehouseutils.IDResolutionEnabled() && slices.Contains(warehouseutils.IdentityEnabledWarehouses, brt.destType) {