From 937209642773d0cdebbe09e1880669f99a8c121c Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Sat, 7 Dec 2024 23:33:18 +0530 Subject: [PATCH 01/10] feat: clevertap segment initial commit --- .../clevertap_segment/clevertapSegment.go | 319 ++++++++++++++++++ .../clevertap_segment/manager.go | 49 +++ .../clevertap_segment/types.go | 112 ++++++ .../clevertap_segment/utils.go | 149 ++++++++ .../asyncdestinationmanager/common/common.go | 1 + .../asyncdestinationmanager/common/utils.go | 2 +- .../asyncdestinationmanager/manager.go | 7 +- router/batchrouter/handle_lifecycle.go | 11 +- 8 files changed, 643 insertions(+), 7 deletions(-) create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go new file mode 100644 index 0000000000..52162bf86f --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go @@ -0,0 +1,319 @@ +package clevertapSegment + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/rudderlabs/rudder-go-kit/stats" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" +) + +type clevertapServiceImpl struct { + BulkApi string + NotifyApi string + ConnectionConfig *ConnectionConfig +} + +// GetCleverTapEndpoint returns the API endpoint for the given region +func (u *clevertapServiceImpl) getCleverTapEndpoint(region string) (string, error) { + // Mapping of regions to endpoints + endpoints := map[string]string{ + "IN": "in1.api.clevertap.com", + "SINGAPORE": "sg1.api.clevertap.com", + "US": "us1.api.clevertap.com", + "INDONESIA": "aps3.api.clevertap.com", + "UAE": "mec1.api.clevertap.com", + "EU": "api.clevertap.com", + } + + // Normalize the region input to uppercase for case-insensitivity + region = strings.ToUpper(region) + + // Check if the region exists in the map + if endpoint, exists := endpoints[region]; exists { + return endpoint, nil + } + + // Return an error if the region is not recognized + return "", fmt.Errorf("unknown region: %s", region) +} + +func (u *clevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *clevertapServiceImpl { + endpoint, err := u.getCleverTapEndpoint(destConfig.region) + if err != nil { + return nil + } + return &clevertapServiceImpl{ + BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), + NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), + } +} + +func (*ClevertapBulkUploader) Transform(job *jobsdb.JobT) (string, error) { + return common.GetMarshalledData(string(job.EventPayload), job.JobID) +} + +func (u *clevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) { + req, err := http.NewRequest(data.Method, data.Endpoint, data.Body) + if err != nil { + return nil, 500, err + } + req.Header.Add("X-CleverTap-Account-Id", data.appKey) + req.Header.Add("X-CleverTap-Passcode", data.accessToken) + req.Header.Add("content-type", data.ContentType) + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + return nil, 500, err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, 500, err + } + return body, res.StatusCode, err +} + +func (u *clevertapServiceImpl) UploadBulkFile(filePath string, presignedURL string) error { + // Open the file + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Get the file information + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat file: %w", err) + } + fileSize := fileInfo.Size() + + fmt.Printf("Uploading file: %s, Size: %d bytes\n", fileInfo.Name(), fileSize) + + // Create the PUT request + req, err := http.NewRequest("PUT", presignedURL, file) + if err != nil { + return fmt.Errorf("failed to create PUT request: %w", err) + } + req.ContentLength = fileSize + // Execute the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to upload file: %w", err) + } + defer resp.Body.Close() + + // Check the response status + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("upload failed, status: %s, response: %s", resp.Status, string(body)) + } + + fmt.Println("CSV file uploaded successfully!") + return nil +} + +func (u *clevertapServiceImpl) getPresignedS3URL(appKey string, accessToken string) (string, error) { + data := &HttpRequestData{ + Method: http.MethodPost, + Endpoint: u.BulkApi, + ContentType: "application/json", + appKey: appKey, + accessToken: accessToken, + } + + body, _, err := u.MakeHTTPRequest(data) + if err != nil { + return "", err + } + + // Parse the response + var result struct { + PresignedS3URL string `json:"presignedS3URL"` + } + if err := json.Unmarshal(body, &result); err != nil { + return "", err + } + + if result.PresignedS3URL == "" { + return "", fmt.Errorf("presigned URL is empty after parsing") + } + + return result.PresignedS3URL, nil +} + +// Function to convert *backendconfig.Connection to ConnectionConfig using marshal and unmarshal +func (u *clevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) { + if conn == nil { + return nil, fmt.Errorf("connection is nil") + } + + // Marshal the backendconfig.Connection to JSON + data, err := json.Marshal(conn) + if err != nil { + return nil, fmt.Errorf("failed to marshal connection: %w", err) + } + + // Unmarshal the JSON into ConnectionConfig + var connConfig ConnectionConfig + err = json.Unmarshal(data, &connConfig) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal to ConnectionConfig: %w", err) + } + + // Set default SenderName if it is empty + if connConfig.Config.Destination.SenderName == "" { + connConfig.Config.Destination.SenderName = "Rudderstack" + } + + return &connConfig, nil +} + +func (u *clevertapServiceImpl) namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string) error { + url := u.NotifyApi + + // Construct the request payload + payload := map[string]interface{}{ + "name": u.ConnectionConfig.Config.Destination.SegmentName, + "email": u.ConnectionConfig.Config.Destination.AdminEmail, + "filename": csvFilePath, + "creator": u.ConnectionConfig.Config.Destination.SenderName, + "url": presignedURL, + "replace": false, + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + return err + } + + // Create HttpRequestData + data := &HttpRequestData{ + Method: http.MethodPost, + Endpoint: url, + Body: bytes.NewBuffer(payloadBytes), + ContentType: "application/json", + appKey: appKey, + accessToken: accessToken, + } + + // Use MakeHTTPRequest to send the request + _, _, err = u.MakeHTTPRequest(data) + if err != nil { + return err + } + + return nil +} + +func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { + destination := asyncDestStruct.Destination + // connection := asyncDestStruct.Connection + filePath := asyncDestStruct.FileName + var failedJobs []int64 + var successJobs []int64 + + actionFiles, err := u.createCSVFile(filePath) + if err != nil { + return common.AsyncUploadOutput{ + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...), + FailedReason: fmt.Sprintf("got error while transforming the file. %v", err.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs), + DestinationID: destination.ID, + } + } + uploadRetryableStat := u.statsFactory.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + }) + + uploadRetryableStat.Count(len(actionFiles.FailedJobIDs)) + + uploadTimeStat := u.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + }) + + presignedURL, urlErr := u.service.getPresignedS3URL(u.appKey, u.accessToken) + + if urlErr != nil { + eventsAbortedStat := u.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + }) + eventsAbortedStat.Count(len(asyncDestStruct.ImportingJobIDs)) + return common.AsyncUploadOutput{ + AbortCount: len(asyncDestStruct.ImportingJobIDs), + DestinationID: asyncDestStruct.Destination.ID, + AbortJobIDs: asyncDestStruct.ImportingJobIDs, + AbortReason: fmt.Sprintf("%s %v", "Error while fetching presigned url", err.Error()), + } + } + + startTime := time.Now() + errorDuringUpload := u.service.UploadBulkFile(actionFiles.CSVFilePath, presignedURL) + uploadTimeStat.Since(startTime) + + if errorDuringUpload != nil { + u.logger.Error("error in uploading the bulk file: %v", errorDuringUpload) + failedJobs = append(append(failedJobs, actionFiles.SuccessfulJobIDs...), actionFiles.FailedJobIDs...) + // remove the file that could not be uploaded + err = os.Remove(actionFiles.CSVFilePath) + if err != nil { + return common.AsyncUploadOutput{ + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...), + FailedReason: fmt.Sprintf("Error in removing zip file: %v", err.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs), + DestinationID: destination.ID, + } + } else { + return common.AsyncUploadOutput{ + FailedJobIDs: failedJobs, + FailedReason: fmt.Sprintf("error in uploading the bulk file: %v", errorDuringUpload.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs), + DestinationID: destination.ID, + } + } + } + + failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) + successJobs = append(successJobs, actionFiles.SuccessfulJobIDs...) + + errorDuringNaming := u.service.namingSegment(destination, presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) + + if errorDuringNaming != nil { + // Handle error appropriately, e.g., log it or return it + u.logger.Error("Error during naming segment: %v", errorDuringNaming) + return common.AsyncUploadOutput{ + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, failedJobs...), + FailedReason: fmt.Sprintf("Error during naming segment: %v", errorDuringNaming.Error()), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(failedJobs), + DestinationID: destination.ID, + } + } + + err = os.Remove(actionFiles.CSVFilePath) + if err != nil { + u.logger.Error("Error in removing zip file: %v", err) + } + + return common.AsyncUploadOutput{ + ImportingJobIDs: successJobs, + FailedJobIDs: append(asyncDestStruct.FailedJobIDs, failedJobs...), + ImportingCount: len(successJobs), + FailedCount: len(asyncDestStruct.FailedJobIDs) + len(failedJobs), + DestinationID: destination.ID, + } +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go new file mode 100644 index 0000000000..4513cb277e --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go @@ -0,0 +1,49 @@ +package clevertapSegment + +import ( + "encoding/json" + "fmt" + + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" +) + +func NewClevertapBulkUploader(logger logger.Logger, statsFactory stats.Stats, destinationName, accessToken, appKey, endpoint string, clevertap clevertapService, connectionConfig *ConnectionConfig) common.AsyncUploadAndTransformManager { + return &ClevertapBulkUploader{ + destName: destinationName, + logger: logger.Child("Clevertap").Child("ClevertapBulkUploader"), + statsFactory: statsFactory, + accessToken: accessToken, + appKey: appKey, + baseEndpoint: endpoint, + fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destinationName, 5*bytesize.GB), + jobToCSVMap: map[int64]int64{}, + service: clevertap, + clevertapConnectionConfig: connectionConfig, + } +} + +func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT, connection *backendconfig.Connection) (common.AsyncDestinationManager, error) { + destConfig := DestinationConfig{} + jsonConfig, err := json.Marshal(destination.Config) + if err != nil { + return nil, fmt.Errorf("error in marshalling destination config: %v", err) + } + err = json.Unmarshal(jsonConfig, &destConfig) + if err != nil { + return nil, fmt.Errorf("error in unmarshalling destination config: %v", err) + } + destName := destination.DestinationDefinition.Name + + clevertapService := &clevertapServiceImpl{} + clevertapImpl := clevertapService.getBulkApi(destConfig) + clevertapConnectionConfig, err := clevertapService.convertToConnectionConfig(connection) + + return common.SimpleAsyncDestinationManager{ + UploaderAndTransformer: NewClevertapBulkUploader(logger, statsFactory, destName, destConfig.AccessToken, destConfig.AppKey, clevertapImpl.BulkApi, clevertapService, clevertapConnectionConfig), + }, nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go new file mode 100644 index 0000000000..d4ca60c570 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go @@ -0,0 +1,112 @@ +package clevertapSegment + +import ( + "context" + "encoding/csv" + "encoding/json" + "io" + "net/http" + "os" + + "github.com/rudderlabs/rudder-go-kit/stats" + + "github.com/rudderlabs/rudder-go-kit/logger" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" +) + +type ClevertapBulkUploader struct { + destName string + logger logger.Logger + statsFactory stats.Stats + appKey string + accessToken string + baseEndpoint string + fileSizeLimit int64 + jobToCSVMap map[int64]int64 + service clevertapService + clevertapConnectionConfig *ConnectionConfig +} + +type DestinationConfig struct { + AppKey string `json:"appKey"` + AccessToken string `json:"accessToken"` + region string `json:"region"` + OneTrustCookieCategories []OneTrustCookieCategory `json:"oneTrustCookieCategories"` +} + +type OneTrustCookieCategory struct { + OneTrustCookieCategory string `json:"oneTrustCookieCategory"` +} + +type HttpRequestData struct { + Body io.Reader + appKey string + accessToken string + Endpoint string + ContentType string + Method string +} + +type ActionFileInfo struct { + Action string + CSVWriter *csv.Writer + CSVFilePath string + SuccessfulJobIDs []int64 + FailedJobIDs []int64 + FileSize int64 + EventCount int64 + File *os.File +} + +type Message struct { + Action string `json:"action"` + Type string `json:"type"` + Channel string `json:"channel"` + Fields json.RawMessage `json:"fields"` + RecordId string `json:"recordId"` + Context json.RawMessage `json:"context"` +} + +type Metadata struct { + JobID int64 `json:"job_id"` +} + +type Data struct { + Message Message `json:"message"` + Metadata Metadata `json:"metadata"` +} + +type ConnectionConfig struct { + SourceID string `json:"sourceId"` + DestinationID string `json:"destinationId"` + Enabled bool `json:"enabled"` + Config struct { + Destination struct { + SchemaVersion string `json:"schemaVersion"` + SegmentName string `json:"segmentName"` + AdminEmail string `json:"adminEmail"` + SenderName string `json:"senderName"` + } `json:"destination"` + } `json:"config"` +} + +type Uploader interface { + Upload(*common.AsyncDestinationStruct) common.AsyncUploadOutput + UploadBulkFile(ctx context.Context, filePath string) (bool, error) + PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error + convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) + namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath string) error +} + +type HttpClient interface { + Do(req *http.Request) (*http.Response, error) +} + +type clevertapService interface { + UploadBulkFile(filePath string, presignedURL string) error + MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) + getPresignedS3URL(string, string) (string, error) + namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string) error +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go new file mode 100644 index 0000000000..bd25df59cc --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go @@ -0,0 +1,149 @@ +package clevertapSegment + +import ( + "bufio" + "encoding/csv" + "fmt" + "os" + "path/filepath" + + "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + + "github.com/rudderlabs/rudder-go-kit/stats" + + "github.com/rudderlabs/rudder-server/utils/misc" +) + +func (u *ClevertapBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error { + newFileSize := actionFile.FileSize + int64(len(line)) + if newFileSize < u.fileSizeLimit { + actionFile.FileSize = newFileSize + actionFile.EventCount += 1 + + // Unmarshal Properties into a map of json.RawMessage + var fields map[string]interface{} + if err := jsoniter.Unmarshal(data.Message.Fields, &fields); err != nil { + return err + } + + // Check for presence of "i" and "g" values + if valueG, okG := fields["g"]; okG { + // If "g" exists, prioritize it and omit "i" + csvRow := []string{"g", fmt.Sprintf("%v", valueG)} // Type: g + if err := actionFile.CSVWriter.Write(csvRow); err != nil { + return err + } + } else if valueI, okI := fields["i"]; okI { + // Write "i" value only if "g" does not exist + csvRow := []string{"i", fmt.Sprintf("%v", valueI)} // Type: i + if err := actionFile.CSVWriter.Write(csvRow); err != nil { + return err + } + } + + // Write the CSV header only once + if actionFile.EventCount == 1 { + // Fixed headers + headers := []string{"Type", "Identity"} + if err := actionFile.CSVWriter.Write(headers); err != nil { + return err + } + } + actionFile.CSVWriter.Flush() + actionFile.SuccessfulJobIDs = append(actionFile.SuccessfulJobIDs, data.Metadata.JobID) + } else { + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + return nil +} + +func createCSVWriter(fileName string) (*ActionFileInfo, error) { + // Open or create the file where the CSV will be written + file, err := os.Create(fileName) + if err != nil { + return nil, fmt.Errorf("failed to create file: %v", err) + } + + // Create a new CSV writer using the file + csvWriter := csv.NewWriter(file) + + // Return the ActionFileInfo struct with the CSV writer, file, and file path + return &ActionFileInfo{ + CSVWriter: csvWriter, + File: file, + CSVFilePath: fileName, + }, nil +} + +func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionFileInfo, error) { + // Create a temporary directory using misc.CreateTMPDIR + tmpDirPath, err := misc.CreateTMPDIR() + if err != nil { + return nil, fmt.Errorf("failed to create temporary directory: %v", err) + } + + // Define a local directory name within the temp directory + localTmpDirName := fmt.Sprintf("/%s/", misc.RudderAsyncDestinationLogs) + + // Combine the temporary directory with the local directory name and generate a unique file path + path := filepath.Join(tmpDirPath, localTmpDirName, uuid.NewString()) + csvFilePath := fmt.Sprintf("%v.csv", path) + + // Initialize the CSV writer with the generated file path + actionFile, err := createCSVWriter(csvFilePath) + if err != nil { + return nil, err + } + defer actionFile.File.Close() // Ensure the file is closed when done + + // Store the CSV file path in the ActionFileInfo struct + actionFile.CSVFilePath = csvFilePath + + // Create a scanner to read the existing file line by line + existingFile, err := os.Open(existingFilePath) + if err != nil { + return nil, fmt.Errorf("failed to open existing file: %v", err) + } + defer existingFile.Close() + + scanner := bufio.NewScanner(existingFile) + scanner.Buffer(nil, 50000*1024) // Adjust the buffer size if necessary + + for scanner.Scan() { + line := scanner.Text() + var data Data + if err := jsoniter.Unmarshal([]byte(line), &data); err != nil { + // Collect the failed job ID + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + continue + } + + // Calculate the payload size and observe it + payloadSizeStat := u.statsFactory.NewTaggedStat("payload_size", stats.HistogramType, + map[string]string{ + "module": "batch_router", + "destType": u.destName, + }) + payloadSizeStat.Observe(float64(len(data.Message.Fields))) + + // Populate the CSV file and collect success/failure job IDs + err := u.PopulateCsvFile(actionFile, line, data) + if err != nil { + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error while scanning file: %v", err) + } + + // After processing, calculate the final file size + fileInfo, err := os.Stat(actionFile.CSVFilePath) + if err != nil { + return nil, fmt.Errorf("failed to retrieve file info: %v", err) + } + actionFile.FileSize = fileInfo.Size() + + return actionFile, nil +} diff --git a/router/batchrouter/asyncdestinationmanager/common/common.go b/router/batchrouter/asyncdestinationmanager/common/common.go index 7ae3df5211..b87d40cd0d 100644 --- a/router/batchrouter/asyncdestinationmanager/common/common.go +++ b/router/batchrouter/asyncdestinationmanager/common/common.go @@ -114,6 +114,7 @@ type AsyncDestinationStruct struct { UploadMutex sync.RWMutex DestinationUploadURL string Destination *backendconfig.DestinationT + Connection *backendconfig.Connection Manager AsyncDestinationManager AttemptNums map[int64]int FirstAttemptedAts map[int64]time.Time diff --git a/router/batchrouter/asyncdestinationmanager/common/utils.go b/router/batchrouter/asyncdestinationmanager/common/utils.go index 8b541ea016..415e83e839 100644 --- a/router/batchrouter/asyncdestinationmanager/common/utils.go +++ b/router/batchrouter/asyncdestinationmanager/common/utils.go @@ -3,7 +3,7 @@ package common import "slices" var ( - asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD"} + asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD", "CLEVERTAP_SEGMENT"} sftpDestinations = []string{"SFTP"} ) diff --git a/router/batchrouter/asyncdestinationmanager/manager.go b/router/batchrouter/asyncdestinationmanager/manager.go index 884389b30c..11c82e526b 100644 --- a/router/batchrouter/asyncdestinationmanager/manager.go +++ b/router/batchrouter/asyncdestinationmanager/manager.go @@ -10,6 +10,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" bingadsaudience "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/bing-ads/audience" bingadsofflineconversions "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions" + clevertapSegment "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/eloqua" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload" @@ -25,6 +26,7 @@ func newRegularManager( statsFactory stats.Stats, destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig, + connection *backendconfig.Connection, ) (common.AsyncDestinationManager, error) { switch destination.DestinationDefinition.Name { case "BINGADS_AUDIENCE": @@ -41,6 +43,8 @@ func newRegularManager( return klaviyobulkupload.NewManager(logger, statsFactory, destination) case "LYTICS_BULK_UPLOAD": return lyticsBulkUpload.NewManager(logger, statsFactory, destination) + case "CLEVERTAP_SEGMENT": + return clevertapSegment.NewManager(logger, statsFactory, destination, connection) } return nil, errors.New("invalid destination type") } @@ -59,10 +63,11 @@ func NewManager( statsFactory stats.Stats, destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig, + connection *backendconfig.Connection, ) (common.AsyncDestinationManager, error) { switch { case common.IsAsyncRegularDestination(destination.DestinationDefinition.Name): - return newRegularManager(conf, logger, statsFactory, destination, backendConfig) + return newRegularManager(conf, logger, statsFactory, destination, backendConfig, connection) case common.IsSFTPDestination(destination.DestinationDefinition.Name): return newSFTPManager(logger, statsFactory, destination) } diff --git a/router/batchrouter/handle_lifecycle.go b/router/batchrouter/handle_lifecycle.go index 97d1e10f02..8de1eda2e1 100644 --- a/router/batchrouter/handle_lifecycle.go +++ b/router/batchrouter/handle_lifecycle.go @@ -261,9 +261,9 @@ func (brt *Handle) Shutdown() { _ = brt.backgroundWait() } -func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT) { +func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT, connection *backendconfig.Connection) { _, ok := brt.asyncDestinationStruct[destination.ID] - manager, err := asyncdestinationmanager.NewManager(brt.conf, brt.logger.Child("asyncdestinationmanager"), stats.Default, destination, brt.backendConfig) + manager, err := asyncdestinationmanager.NewManager(brt.conf, brt.logger.Child("asyncdestinationmanager"), stats.Default, destination, brt.backendConfig, connection) if err != nil { brt.logger.Errorf("BRT: Error initializing async destination struct for %s destination: %v", destination.Name, err) destInitFailStat := stats.Default.NewTaggedStat("destination_initialization_fail", stats.CountType, map[string]string{ @@ -277,17 +277,18 @@ func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.Destina brt.asyncDestinationStruct[destination.ID] = &asynccommon.AsyncDestinationStruct{} } brt.asyncDestinationStruct[destination.ID].Destination = destination + brt.asyncDestinationStruct[destination.ID].Connection = connection brt.asyncDestinationStruct[destination.ID].Manager = manager } -func (brt *Handle) refreshDestination(destination backendconfig.DestinationT) { +func (brt *Handle) refreshDestination(destination backendconfig.DestinationT, connection backendconfig.Connection) { if asynccommon.IsAsyncDestination(destination.DestinationDefinition.Name) { asyncDestStruct, ok := brt.asyncDestinationStruct[destination.ID] if ok && asyncDestStruct.Destination != nil && asyncDestStruct.Destination.RevisionID == destination.RevisionID { return } - brt.initAsyncDestinationStruct(&destination) + brt.initAsyncDestinationStruct(&destination, &connection) } } @@ -395,7 +396,7 @@ func (brt *Handle) backendConfigSubscriber() { uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config) } destinationsMap[destination.ID].Sources = append(destinationsMap[destination.ID].Sources, source) - brt.refreshDestination(destination) + brt.refreshDestination(destination, wConfig.Connections[destination.ID]) // initialize map to track encountered anonymousIds for a warehouse destination if warehouseutils.IDResolutionEnabled() && slices.Contains(warehouseutils.IdentityEnabledWarehouses, brt.destType) { From 0210ae495516807456b61f057b14137184360ac9 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Sun, 8 Dec 2024 14:49:23 +0530 Subject: [PATCH 02/10] feat: clean up --- .../clevertap_segment/clevertapSegment.go | 58 +++++++++++++------ .../clevertap_segment/manager.go | 3 + .../clevertap_segment/types.go | 9 ++- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go index 52162bf86f..127185cfad 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go @@ -48,7 +48,7 @@ func (u *clevertapServiceImpl) getCleverTapEndpoint(region string) (string, erro } func (u *clevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *clevertapServiceImpl { - endpoint, err := u.getCleverTapEndpoint(destConfig.region) + endpoint, err := u.getCleverTapEndpoint(destConfig.Region) if err != nil { return nil } @@ -84,7 +84,7 @@ func (u *clevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, i return body, res.StatusCode, err } -func (u *clevertapServiceImpl) UploadBulkFile(filePath string, presignedURL string) error { +func (u *clevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) error { // Open the file file, err := os.Open(filePath) if err != nil { @@ -120,12 +120,10 @@ func (u *clevertapServiceImpl) UploadBulkFile(filePath string, presignedURL stri body, _ := io.ReadAll(resp.Body) return fmt.Errorf("upload failed, status: %s, response: %s", resp.Status, string(body)) } - - fmt.Println("CSV file uploaded successfully!") return nil } -func (u *clevertapServiceImpl) getPresignedS3URL(appKey string, accessToken string) (string, error) { +func (u *clevertapServiceImpl) getPresignedS3URL(appKey, accessToken string) (string, error) { data := &HttpRequestData{ Method: http.MethodPost, Endpoint: u.BulkApi, @@ -134,7 +132,7 @@ func (u *clevertapServiceImpl) getPresignedS3URL(appKey string, accessToken stri accessToken: accessToken, } - body, _, err := u.MakeHTTPRequest(data) + body, statusCode, err := u.MakeHTTPRequest(data) if err != nil { return "", err } @@ -142,13 +140,23 @@ func (u *clevertapServiceImpl) getPresignedS3URL(appKey string, accessToken stri // Parse the response var result struct { PresignedS3URL string `json:"presignedS3URL"` + Expiry string `json:"expiry"` + Status string `json:"status"` + Error string `json:"error"` + Code int `json:"code"` } if err := json.Unmarshal(body, &result); err != nil { return "", err } + if statusCode != 200 { + err := fmt.Errorf("Error while fetching preSignedUrl: %s", result.Error) + return "", err + } + if result.PresignedS3URL == "" { - return "", fmt.Errorf("presigned URL is empty after parsing") + err := fmt.Errorf("presigned URL is empty after parsing") + return "", err } return result.PresignedS3URL, nil @@ -175,7 +183,7 @@ func (u *clevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Con // Set default SenderName if it is empty if connConfig.Config.Destination.SenderName == "" { - connConfig.Config.Destination.SenderName = "Rudderstack" + connConfig.Config.Destination.SenderName = DEFAULT_SENDER_NAME } return &connConfig, nil @@ -191,7 +199,7 @@ func (u *clevertapServiceImpl) namingSegment(destination *backendconfig.Destinat "filename": csvFilePath, "creator": u.ConnectionConfig.Config.Destination.SenderName, "url": presignedURL, - "replace": false, + "replace": true, } payloadBytes, err := json.Marshal(payload) @@ -210,17 +218,36 @@ func (u *clevertapServiceImpl) namingSegment(destination *backendconfig.Destinat } // Use MakeHTTPRequest to send the request - _, _, err = u.MakeHTTPRequest(data) + body, statusCode, err := u.MakeHTTPRequest(data) if err != nil { return err } + // Parse the response + var result struct { + SegmentID int `json:"Segment ID"` + Status string `json:"status"` + Error string `json:"error"` + Code int `json:"code"` + } + if err := json.Unmarshal(body, &result); err != nil { + return err + } + + if statusCode != 200 { + err := fmt.Errorf("Error while namimng segment: %s", result.Error) + return err + } + + if result.SegmentID == 0 { + err := fmt.Errorf("Segment Creation is Unsuccessful") + return err + } return nil } func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { destination := asyncDestStruct.Destination - // connection := asyncDestStruct.Connection filePath := asyncDestStruct.FileName var failedJobs []int64 var successJobs []int64 @@ -267,7 +294,6 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS uploadTimeStat.Since(startTime) if errorDuringUpload != nil { - u.logger.Error("error in uploading the bulk file: %v", errorDuringUpload) failedJobs = append(append(failedJobs, actionFiles.SuccessfulJobIDs...), actionFiles.FailedJobIDs...) // remove the file that could not be uploaded err = os.Remove(actionFiles.CSVFilePath) @@ -294,12 +320,10 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS errorDuringNaming := u.service.namingSegment(destination, presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) if errorDuringNaming != nil { - // Handle error appropriately, e.g., log it or return it - u.logger.Error("Error during naming segment: %v", errorDuringNaming) return common.AsyncUploadOutput{ - FailedJobIDs: append(asyncDestStruct.FailedJobIDs, failedJobs...), - FailedReason: fmt.Sprintf("Error during naming segment: %v", errorDuringNaming.Error()), - FailedCount: len(asyncDestStruct.FailedJobIDs) + len(failedJobs), + AbortCount: len(asyncDestStruct.ImportingJobIDs), + AbortJobIDs: asyncDestStruct.ImportingJobIDs, + AbortReason: fmt.Sprintf("%s %v", "Error while creating the segment", err.Error()), DestinationID: destination.ID, } } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go index 4513cb277e..34a6f0df00 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go @@ -42,6 +42,9 @@ func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *bac clevertapService := &clevertapServiceImpl{} clevertapImpl := clevertapService.getBulkApi(destConfig) clevertapConnectionConfig, err := clevertapService.convertToConnectionConfig(connection) + if err != nil { + return nil, fmt.Errorf("error converting to connection config for clevertap segment: %v", err) + } return common.SimpleAsyncDestinationManager{ UploaderAndTransformer: NewClevertapBulkUploader(logger, statsFactory, destName, destConfig.AccessToken, destConfig.AppKey, clevertapImpl.BulkApi, clevertapService, clevertapConnectionConfig), diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go index d4ca60c570..38f803dc61 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go @@ -1,7 +1,6 @@ package clevertapSegment import ( - "context" "encoding/csv" "encoding/json" "io" @@ -32,7 +31,7 @@ type ClevertapBulkUploader struct { type DestinationConfig struct { AppKey string `json:"appKey"` AccessToken string `json:"accessToken"` - region string `json:"region"` + Region string `json:"region"` OneTrustCookieCategories []OneTrustCookieCategory `json:"oneTrustCookieCategories"` } @@ -78,6 +77,8 @@ type Data struct { Metadata Metadata `json:"metadata"` } +const DEFAULT_SENDER_NAME = "Rudderstack" + type ConnectionConfig struct { SourceID string `json:"sourceId"` DestinationID string `json:"destinationId"` @@ -94,10 +95,8 @@ type ConnectionConfig struct { type Uploader interface { Upload(*common.AsyncDestinationStruct) common.AsyncUploadOutput - UploadBulkFile(ctx context.Context, filePath string) (bool, error) PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) - namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath string) error } type HttpClient interface { @@ -105,7 +104,7 @@ type HttpClient interface { } type clevertapService interface { - UploadBulkFile(filePath string, presignedURL string) error + UploadBulkFile(filePath, presignedURL string) error MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) getPresignedS3URL(string, string) (string, error) namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string) error From c505e9d164eb5e119df643e99d7e8ee7d2a177bb Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 10 Dec 2024 02:13:08 +0530 Subject: [PATCH 03/10] feat: test case addition and refactoring --- .../clevertap_segment_mock.go | 71 ++++++ .../clevertap_segment/clevertapSegment.go | 46 ++-- .../clevertap_segment/clevertap_suite_test.go | 13 + .../clevertap_segment/clevertap_test.go | 222 ++++++++++++++++++ .../clevertap_segment/manager.go | 9 +- .../clevertap_segment/testdata/uploadData.txt | 5 + .../clevertap_segment/types.go | 37 +-- .../clevertap_segment/utils.go | 14 +- 8 files changed, 372 insertions(+), 45 deletions(-) create mode 100644 mocks/router/clevertap_segment/clevertap_segment_mock.go create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt diff --git a/mocks/router/clevertap_segment/clevertap_segment_mock.go b/mocks/router/clevertap_segment/clevertap_segment_mock.go new file mode 100644 index 0000000000..ffc44d75f8 --- /dev/null +++ b/mocks/router/clevertap_segment/clevertap_segment_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment (interfaces: ClevertapService) +// +// Generated by this command: +// +// mockgen -destination=./clevertap_segment_mock.go -package=mocks github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment ClevertapService +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + clevertapSegment "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment" + gomock "go.uber.org/mock/gomock" +) + +// MockClevertapService is a mock of ClevertapService interface. +type MockClevertapService struct { + ctrl *gomock.Controller + recorder *MockClevertapServiceMockRecorder + isgomock struct{} +} + +// MockClevertapServiceMockRecorder is the mock recorder for MockClevertapService. +type MockClevertapServiceMockRecorder struct { + mock *MockClevertapService +} + +// NewMockClevertapService creates a new mock instance. +func NewMockClevertapService(ctrl *gomock.Controller) *MockClevertapService { + mock := &MockClevertapService{ctrl: ctrl} + mock.recorder = &MockClevertapServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClevertapService) EXPECT() *MockClevertapServiceMockRecorder { + return m.recorder +} + +// MakeHTTPRequest mocks base method. +func (m *MockClevertapService) MakeHTTPRequest(data *clevertapSegment.HttpRequestData) ([]byte, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MakeHTTPRequest", data) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// MakeHTTPRequest indicates an expected call of MakeHTTPRequest. +func (mr *MockClevertapServiceMockRecorder) MakeHTTPRequest(data any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeHTTPRequest", reflect.TypeOf((*MockClevertapService)(nil).MakeHTTPRequest), data) +} + +// UploadBulkFile mocks base method. +func (m *MockClevertapService) UploadBulkFile(filePath, presignedURL string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UploadBulkFile", filePath, presignedURL) + ret0, _ := ret[0].(error) + return ret0 +} + +// UploadBulkFile indicates an expected call of UploadBulkFile. +func (mr *MockClevertapServiceMockRecorder) UploadBulkFile(filePath, presignedURL any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadBulkFile", reflect.TypeOf((*MockClevertapService)(nil).UploadBulkFile), filePath, presignedURL) +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go index 127185cfad..e2b56de92d 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go @@ -17,14 +17,14 @@ import ( "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) -type clevertapServiceImpl struct { +type ClevertapServiceImpl struct { BulkApi string NotifyApi string ConnectionConfig *ConnectionConfig } // GetCleverTapEndpoint returns the API endpoint for the given region -func (u *clevertapServiceImpl) getCleverTapEndpoint(region string) (string, error) { +func (u *ClevertapServiceImpl) getCleverTapEndpoint(region string) (string, error) { // Mapping of regions to endpoints endpoints := map[string]string{ "IN": "in1.api.clevertap.com", @@ -47,12 +47,12 @@ func (u *clevertapServiceImpl) getCleverTapEndpoint(region string) (string, erro return "", fmt.Errorf("unknown region: %s", region) } -func (u *clevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *clevertapServiceImpl { +func (u *ClevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *ClevertapServiceImpl { endpoint, err := u.getCleverTapEndpoint(destConfig.Region) if err != nil { return nil } - return &clevertapServiceImpl{ + return &ClevertapServiceImpl{ BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), } @@ -62,7 +62,7 @@ func (*ClevertapBulkUploader) Transform(job *jobsdb.JobT) (string, error) { return common.GetMarshalledData(string(job.EventPayload), job.JobID) } -func (u *clevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) { +func (u *ClevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) { req, err := http.NewRequest(data.Method, data.Endpoint, data.Body) if err != nil { return nil, 500, err @@ -84,7 +84,7 @@ func (u *clevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, i return body, res.StatusCode, err } -func (u *clevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) error { +func (u *ClevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) error { // Open the file file, err := os.Open(filePath) if err != nil { @@ -99,8 +99,6 @@ func (u *clevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) err } fileSize := fileInfo.Size() - fmt.Printf("Uploading file: %s, Size: %d bytes\n", fileInfo.Name(), fileSize) - // Create the PUT request req, err := http.NewRequest("PUT", presignedURL, file) if err != nil { @@ -123,16 +121,16 @@ func (u *clevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) err return nil } -func (u *clevertapServiceImpl) getPresignedS3URL(appKey, accessToken string) (string, error) { +func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string, clevertapService ClevertapService) (string, error) { data := &HttpRequestData{ Method: http.MethodPost, - Endpoint: u.BulkApi, + Endpoint: u.presignedURLEndpoint, ContentType: "application/json", appKey: appKey, accessToken: accessToken, } - body, statusCode, err := u.MakeHTTPRequest(data) + body, statusCode, err := u.service.MakeHTTPRequest(data) if err != nil { return "", err } @@ -163,7 +161,7 @@ func (u *clevertapServiceImpl) getPresignedS3URL(appKey, accessToken string) (st } // Function to convert *backendconfig.Connection to ConnectionConfig using marshal and unmarshal -func (u *clevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) { +func (u *ClevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) { if conn == nil { return nil, fmt.Errorf("connection is nil") } @@ -189,15 +187,15 @@ func (u *clevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Con return &connConfig, nil } -func (u *clevertapServiceImpl) namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string) error { - url := u.NotifyApi +func (u *ClevertapBulkUploader) namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string, clevertapService ClevertapService) error { + url := u.notifyEndpoint // Construct the request payload payload := map[string]interface{}{ - "name": u.ConnectionConfig.Config.Destination.SegmentName, - "email": u.ConnectionConfig.Config.Destination.AdminEmail, + "name": u.clevertapConnectionConfig.Config.Destination.SegmentName, + "email": u.clevertapConnectionConfig.Config.Destination.AdminEmail, "filename": csvFilePath, - "creator": u.ConnectionConfig.Config.Destination.SenderName, + "creator": u.clevertapConnectionConfig.Config.Destination.SenderName, "url": presignedURL, "replace": true, } @@ -218,7 +216,7 @@ func (u *clevertapServiceImpl) namingSegment(destination *backendconfig.Destinat } // Use MakeHTTPRequest to send the request - body, statusCode, err := u.MakeHTTPRequest(data) + body, statusCode, err := u.service.MakeHTTPRequest(data) if err != nil { return err } @@ -273,7 +271,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS "destType": u.destName, }) - presignedURL, urlErr := u.service.getPresignedS3URL(u.appKey, u.accessToken) + presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken, u.service) if urlErr != nil { eventsAbortedStat := u.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ @@ -294,7 +292,13 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS uploadTimeStat.Since(startTime) if errorDuringUpload != nil { - failedJobs = append(append(failedJobs, actionFiles.SuccessfulJobIDs...), actionFiles.FailedJobIDs...) + failedJobs = append(failedJobs, actionFiles.SuccessfulJobIDs...) + + // Append only failed job IDs if they exist + if len(actionFiles.FailedJobIDs) > 0 { + fmt.Println("Here") + failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) + } // remove the file that could not be uploaded err = os.Remove(actionFiles.CSVFilePath) if err != nil { @@ -317,7 +321,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) successJobs = append(successJobs, actionFiles.SuccessfulJobIDs...) - errorDuringNaming := u.service.namingSegment(destination, presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) + errorDuringNaming := u.namingSegment(destination, presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken, u.service) if errorDuringNaming != nil { return common.AsyncUploadOutput{ diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go new file mode 100644 index 0000000000..e0dd596355 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_suite_test.go @@ -0,0 +1,13 @@ +package clevertapSegment_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestClevertapSegment(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "clevertap_segment Suite") +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go new file mode 100644 index 0000000000..03d080e4e2 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go @@ -0,0 +1,222 @@ +package clevertapSegment_test + +import ( + "os" + "sync" + "fmt" + "path/filepath" + + "go.uber.org/mock/gomock" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rudderlabs/rudder-go-kit/stats" + + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + + mocks "github.com/rudderlabs/rudder-server/mocks/router/clevertap_segment" + ClevertapSegment "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/clevertap_segment" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" + "github.com/rudderlabs/rudder-server/utils/misc" +) + +var ( + once sync.Once + destination = backendconfig.DestinationT{ + Name: "CLEVERTAP_SEGMENT", + Config: map[string]interface{}{ + "clevertapAccountId": "1234", + "clevertapAccountKey": "1234567", + }, + WorkspaceID: "workspace_id", + } + currentDir, _ = os.Getwd() +) + +func initClevertap() { + once.Do(func() { + logger.Reset() + misc.Init() + }) +} + +var _ = Describe("CLEVERTAP_SEGMENT test", func() { + Context("When uploading the file", func() { + BeforeEach(func() { + config.Reset() + config.Set("BatchRouter.CLEVERTAP_SEGMENT.MaxUploadLimit", 1*bytesize.MB) + + // Ensure the log directory exists + logDir := "/tmp/rudder-async-destination-logs" + os.MkdirAll(logDir, os.ModePerm) // Create the directory if it doesn't exist + }) + + AfterEach(func() { + config.Reset() + }) + + // It("TestClevertapUploadWrongFilepath", func() { + // initClevertap() + // ctrl := gomock.NewController(GinkgoT()) + // defer ctrl.Finish() + + // clevertapService := mocks.NewMockClevertapService(ctrl) + // clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + // BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + // NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + // ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + // SourceId: "source123", + // DestinationId: "destination456", + // Enabled: true, + // Config: ClevertapSegment.ConnConfig{ + // Destination: ClevertapSegment.Destination{ + // SchemaVersion: "v1.0", + // SegmentName: "User Segment A", + // AdminEmail: "admin@example.com", + // SenderName: "Rudderstack", + // }, + // }, + // }, + // } + // bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + // asyncDestination := common.AsyncDestinationStruct{ + // ImportingJobIDs: []int64{1, 2, 3, 4}, + // FailedJobIDs: []int64{}, + // FileName: "", + // Destination: &destination, + // } + + // expected := common.AsyncUploadOutput{ + // FailedReason: "got error while transforming the file. failed to open existing file", + // ImportingJobIDs: nil, + // FailedJobIDs: []int64{1, 2, 3, 4}, + // ImportingCount: 0, + // FailedCount: 4, + // } + + // received := bulkUploader.Upload(&asyncDestination) + // Expect(received).To(Equal(expected)) + // }) + + It("TestClevertapErrorWhileUploadingData", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) + + // Mock expectations + clevertapService.EXPECT(). + UploadBulkFile(gomock.Any(), "https://abc.com"). + Return(fmt.Errorf("Upload failed with status code: 400")). + Times(1) + + expected := common.AsyncUploadOutput{ + FailedReason: "error in uploading the bulk file: Upload failed with status code: 400", + ImportingJobIDs: nil, + FailedJobIDs: []int64{1, 2, 3, 4}, + ImportingCount: 0, + FailedCount: 4, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + + It("TestSuccessfulClevertapUpload", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) + + // Mock expectations for UploadBulkFile + clevertapService.EXPECT(). + UploadBulkFile(gomock.Any(), "https://abc.com"). + Return(nil). + Times(1) + + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"Segment ID": "1234", "status": "success", "code": 200}`), 200, nil). + Times(1) + + expected := common.AsyncUploadOutput{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + ImportingCount: 4, + FailedCount: 0, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + + }) +}) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go index 34a6f0df00..c95cab1b36 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go @@ -12,14 +12,15 @@ import ( "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) -func NewClevertapBulkUploader(logger logger.Logger, statsFactory stats.Stats, destinationName, accessToken, appKey, endpoint string, clevertap clevertapService, connectionConfig *ConnectionConfig) common.AsyncUploadAndTransformManager { +func NewClevertapBulkUploader(logger logger.Logger, statsFactory stats.Stats, destinationName, accessToken, appKey string, clevertapEndpoints *ClevertapServiceImpl, clevertap ClevertapService, connectionConfig *ConnectionConfig) common.AsyncUploadAndTransformManager { return &ClevertapBulkUploader{ destName: destinationName, logger: logger.Child("Clevertap").Child("ClevertapBulkUploader"), statsFactory: statsFactory, accessToken: accessToken, appKey: appKey, - baseEndpoint: endpoint, + presignedURLEndpoint: clevertapEndpoints.BulkApi, + notifyEndpoint: clevertapEndpoints.NotifyApi, fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destinationName, 5*bytesize.GB), jobToCSVMap: map[int64]int64{}, service: clevertap, @@ -39,7 +40,7 @@ func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *bac } destName := destination.DestinationDefinition.Name - clevertapService := &clevertapServiceImpl{} + clevertapService := &ClevertapServiceImpl{} clevertapImpl := clevertapService.getBulkApi(destConfig) clevertapConnectionConfig, err := clevertapService.convertToConnectionConfig(connection) if err != nil { @@ -47,6 +48,6 @@ func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *bac } return common.SimpleAsyncDestinationManager{ - UploaderAndTransformer: NewClevertapBulkUploader(logger, statsFactory, destName, destConfig.AccessToken, destConfig.AppKey, clevertapImpl.BulkApi, clevertapService, clevertapConnectionConfig), + UploaderAndTransformer: NewClevertapBulkUploader(logger, statsFactory, destName, destConfig.AccessToken, destConfig.AppKey, clevertapImpl, clevertapService, clevertapConnectionConfig), }, nil } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt b/router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt new file mode 100644 index 0000000000..a1b57c46f2 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/testdata/uploadData.txt @@ -0,0 +1,5 @@ +{"message":{"action":"insert","fields":{"i":"90478tyr3fgiwhbjwu98y3ruwhe","g":"12345"}},"metadata":{"job_id":1}} +{"message":{"action":"update","fields":{"i": "abc@xyz.com"}},"metadata":{"job_id":2}} +{"message":{"action":"update","fields":{"g": "6789"}},"metadata":{"job_id":3}} +{"message":{"action":"update","fields":{"g": "6543"}},"metadata":{"job_id":4}} + diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go index 38f803dc61..6cac4cf80a 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go @@ -21,10 +21,11 @@ type ClevertapBulkUploader struct { statsFactory stats.Stats appKey string accessToken string - baseEndpoint string + presignedURLEndpoint string + notifyEndpoint string fileSizeLimit int64 jobToCSVMap map[int64]int64 - service clevertapService + service ClevertapService clevertapConnectionConfig *ConnectionConfig } @@ -79,33 +80,37 @@ type Data struct { const DEFAULT_SENDER_NAME = "Rudderstack" +type Destination struct { + SchemaVersion string `json:"schemaVersion"` + SegmentName string `json:"segmentName"` + AdminEmail string `json:"adminEmail"` + SenderName string `json:"senderName"` +} + +type ConnConfig struct { + Destination Destination `json:"destination"` +} + type ConnectionConfig struct { - SourceID string `json:"sourceId"` - DestinationID string `json:"destinationId"` - Enabled bool `json:"enabled"` - Config struct { - Destination struct { - SchemaVersion string `json:"schemaVersion"` - SegmentName string `json:"segmentName"` - AdminEmail string `json:"adminEmail"` - SenderName string `json:"senderName"` - } `json:"destination"` - } `json:"config"` + SourceId string `json:"sourceId"` + DestinationId string `json:"destinationId"` + Enabled bool `json:"enabled"` + Config ConnConfig `json:"config"` } type Uploader interface { Upload(*common.AsyncDestinationStruct) common.AsyncUploadOutput PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) + getPresignedS3URL(string, string, ClevertapService) (string, error) + namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string, clevertapService ClevertapService) error } type HttpClient interface { Do(req *http.Request) (*http.Response, error) } -type clevertapService interface { +type ClevertapService interface { UploadBulkFile(filePath, presignedURL string) error MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) - getPresignedS3URL(string, string) (string, error) - namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string) error } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go index bd25df59cc..d07063c3b2 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go @@ -53,6 +53,7 @@ func (u *ClevertapBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, line actionFile.CSVWriter.Flush() actionFile.SuccessfulJobIDs = append(actionFile.SuccessfulJobIDs, data.Metadata.JobID) } else { + // fmt.println("size exceeding") actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) } return nil @@ -103,7 +104,7 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF // Create a scanner to read the existing file line by line existingFile, err := os.Open(existingFilePath) if err != nil { - return nil, fmt.Errorf("failed to open existing file: %v", err) + return nil, fmt.Errorf("failed to open existing file") } defer existingFile.Close() @@ -114,8 +115,10 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF line := scanner.Text() var data Data if err := jsoniter.Unmarshal([]byte(line), &data); err != nil { - // Collect the failed job ID - actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + // Collect the failed job ID only if it's valid + if data.Metadata.JobID != 0 { // Check if JobID is not zero + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } continue } @@ -130,7 +133,10 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF // Populate the CSV file and collect success/failure job IDs err := u.PopulateCsvFile(actionFile, line, data) if err != nil { - actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + // Collect the failed job ID only if it's valid + if data.Metadata.JobID != 0 { // Check if JobID is not zero + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } } } From bfef1b2dfecc8171fc6eebb273689f01ad6ad624 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 10 Dec 2024 13:00:49 +0530 Subject: [PATCH 04/10] feat: correcting test cases --- .../clevertap_segment/clevertapSegment.go | 12 +- .../clevertap_segment/clevertap_test.go | 164 ++++++++++++------ 2 files changed, 118 insertions(+), 58 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go index e2b56de92d..cf39e195de 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go @@ -294,11 +294,11 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS if errorDuringUpload != nil { failedJobs = append(failedJobs, actionFiles.SuccessfulJobIDs...) - // Append only failed job IDs if they exist - if len(actionFiles.FailedJobIDs) > 0 { - fmt.Println("Here") - failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) - } + // Append only failed job IDs if they exist + if len(actionFiles.FailedJobIDs) > 0 { + fmt.Println("Here") + failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) + } // remove the file that could not be uploaded err = os.Remove(actionFiles.CSVFilePath) if err != nil { @@ -327,7 +327,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS return common.AsyncUploadOutput{ AbortCount: len(asyncDestStruct.ImportingJobIDs), AbortJobIDs: asyncDestStruct.ImportingJobIDs, - AbortReason: fmt.Sprintf("%s %v", "Error while creating the segment", err.Error()), + AbortReason: fmt.Sprintf("%s %v", "Error while creating the segment", errorDuringNaming), DestinationID: destination.ID, } } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go index 03d080e4e2..0901c84098 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go @@ -1,10 +1,10 @@ package clevertapSegment_test import ( + "fmt" "os" + "path/filepath" "sync" - "fmt" - "path/filepath" "go.uber.org/mock/gomock" @@ -59,48 +59,48 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { config.Reset() }) - // It("TestClevertapUploadWrongFilepath", func() { - // initClevertap() - // ctrl := gomock.NewController(GinkgoT()) - // defer ctrl.Finish() - - // clevertapService := mocks.NewMockClevertapService(ctrl) - // clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ - // BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", - // NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", - // ConnectionConfig: &ClevertapSegment.ConnectionConfig{ - // SourceId: "source123", - // DestinationId: "destination456", - // Enabled: true, - // Config: ClevertapSegment.ConnConfig{ - // Destination: ClevertapSegment.Destination{ - // SchemaVersion: "v1.0", - // SegmentName: "User Segment A", - // AdminEmail: "admin@example.com", - // SenderName: "Rudderstack", - // }, - // }, - // }, - // } - // bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} - // asyncDestination := common.AsyncDestinationStruct{ - // ImportingJobIDs: []int64{1, 2, 3, 4}, - // FailedJobIDs: []int64{}, - // FileName: "", - // Destination: &destination, - // } - - // expected := common.AsyncUploadOutput{ - // FailedReason: "got error while transforming the file. failed to open existing file", - // ImportingJobIDs: nil, - // FailedJobIDs: []int64{1, 2, 3, 4}, - // ImportingCount: 0, - // FailedCount: 4, - // } - - // received := bulkUploader.Upload(&asyncDestination) - // Expect(received).To(Equal(expected)) - // }) + It("TestClevertapUploadWrongFilepath", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: "", + Destination: &destination, + } + + expected := common.AsyncUploadOutput{ + FailedReason: "got error while transforming the file. failed to open existing file", + ImportingJobIDs: nil, + FailedJobIDs: []int64{1, 2, 3, 4}, + ImportingCount: 0, + FailedCount: 4, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) It("TestClevertapErrorWhileUploadingData", func() { initClevertap() @@ -137,9 +137,9 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { // Mock handling for MakeHTTPRequest clevertapService.EXPECT(). - MakeHTTPRequest(gomock.Any()). - Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). - Times(1) + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) // Mock expectations clevertapService.EXPECT(). @@ -192,9 +192,9 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { // Mock handling for MakeHTTPRequest clevertapService.EXPECT(). - MakeHTTPRequest(gomock.Any()). - Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). - Times(1) + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) // Mock expectations for UploadBulkFile clevertapService.EXPECT(). @@ -202,9 +202,9 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { Return(nil). Times(1) - clevertapService.EXPECT(). + clevertapService.EXPECT(). MakeHTTPRequest(gomock.Any()). - Return([]byte(`{"Segment ID": "1234", "status": "success", "code": 200}`), 200, nil). + Return([]byte(`{"Segment ID": 1234, "status": "success", "code": 200}`), 200, nil). Times(1) expected := common.AsyncUploadOutput{ @@ -218,5 +218,65 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { Expect(received).To(Equal(expected)) }) + It("TestFailureClevertapUploadWhileNaming", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"presignedS3URL": "https://abc.com", "expiry": "2023-12-31T23:59:59Z", "status": "success", "code": 200}`), 200, nil). + Times(1) + + // Mock expectations for UploadBulkFile + clevertapService.EXPECT(). + UploadBulkFile(gomock.Any(), "https://abc.com"). + Return(nil). + Times(1) + + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"error": "Email id is either not in the right format or does not belong to a valid admin", "status": "fail", "code": 401}`), 401, nil). + Times(1) + + expected := common.AsyncUploadOutput{ + AbortJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: nil, + AbortCount: 4, + FailedCount: 0, + AbortReason: "Error while creating the segment Error while namimng segment: Email id is either not in the right format or does not belong to a valid admin", + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + }) }) From 34f7f1d4f2e7518d40dc624b400b2cf328827c48 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 10 Dec 2024 13:29:10 +0530 Subject: [PATCH 05/10] feat: correcting lint errors --- .../clevertap_segment/clevertapSegment.go | 8 ++++---- .../clevertap_segment/clevertap_test.go | 5 +++-- .../asyncdestinationmanager/clevertap_segment/types.go | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go index cf39e195de..9ee42ce72f 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go @@ -121,7 +121,7 @@ func (u *ClevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) err return nil } -func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string, clevertapService ClevertapService) (string, error) { +func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string) (string, error) { data := &HttpRequestData{ Method: http.MethodPost, Endpoint: u.presignedURLEndpoint, @@ -187,7 +187,7 @@ func (u *ClevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Con return &connConfig, nil } -func (u *ClevertapBulkUploader) namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string, clevertapService ClevertapService) error { +func (u *ClevertapBulkUploader) namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error { url := u.notifyEndpoint // Construct the request payload @@ -271,7 +271,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS "destType": u.destName, }) - presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken, u.service) + presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken) if urlErr != nil { eventsAbortedStat := u.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ @@ -321,7 +321,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) successJobs = append(successJobs, actionFiles.SuccessfulJobIDs...) - errorDuringNaming := u.namingSegment(destination, presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken, u.service) + errorDuringNaming := u.namingSegment(presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) if errorDuringNaming != nil { return common.AsyncUploadOutput{ diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go index 0901c84098..730f806e1e 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/bytesize" @@ -52,7 +53,8 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { // Ensure the log directory exists logDir := "/tmp/rudder-async-destination-logs" - os.MkdirAll(logDir, os.ModePerm) // Create the directory if it doesn't exist + err := os.MkdirAll(logDir, os.ModePerm) // Create the directory if it doesn't exist + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -277,6 +279,5 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { received := bulkUploader.Upload(&asyncDestination) Expect(received).To(Equal(expected)) }) - }) }) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go index 6cac4cf80a..83e85f7ad7 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go @@ -103,7 +103,7 @@ type Uploader interface { PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) getPresignedS3URL(string, string, ClevertapService) (string, error) - namingSegment(destination *backendconfig.DestinationT, presignedURL, csvFilePath, appKey, accessToken string, clevertapService ClevertapService) error + namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error } type HttpClient interface { From 31c5c112d6b6f9a647faba756fa9d823545b0330 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 10 Dec 2024 16:02:20 +0530 Subject: [PATCH 06/10] feat: adding more test cases --- .../clevertap_segment/clevertapSegment.go | 66 +------------- .../clevertap_segment/clevertap_test.go | 51 +++++++++++ .../clevertap_segment/utils.go | 65 ++++++++++++++ .../clevertap_segment/utils_test.go | 89 +++++++++++++++++++ 4 files changed, 206 insertions(+), 65 deletions(-) create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go index 9ee42ce72f..a9078846a9 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go @@ -7,12 +7,10 @@ import ( "io" "net/http" "os" - "strings" "time" "github.com/rudderlabs/rudder-go-kit/stats" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) @@ -23,41 +21,6 @@ type ClevertapServiceImpl struct { ConnectionConfig *ConnectionConfig } -// GetCleverTapEndpoint returns the API endpoint for the given region -func (u *ClevertapServiceImpl) getCleverTapEndpoint(region string) (string, error) { - // Mapping of regions to endpoints - endpoints := map[string]string{ - "IN": "in1.api.clevertap.com", - "SINGAPORE": "sg1.api.clevertap.com", - "US": "us1.api.clevertap.com", - "INDONESIA": "aps3.api.clevertap.com", - "UAE": "mec1.api.clevertap.com", - "EU": "api.clevertap.com", - } - - // Normalize the region input to uppercase for case-insensitivity - region = strings.ToUpper(region) - - // Check if the region exists in the map - if endpoint, exists := endpoints[region]; exists { - return endpoint, nil - } - - // Return an error if the region is not recognized - return "", fmt.Errorf("unknown region: %s", region) -} - -func (u *ClevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *ClevertapServiceImpl { - endpoint, err := u.getCleverTapEndpoint(destConfig.Region) - if err != nil { - return nil - } - return &ClevertapServiceImpl{ - BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), - NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), - } -} - func (*ClevertapBulkUploader) Transform(job *jobsdb.JobT) (string, error) { return common.GetMarshalledData(string(job.EventPayload), job.JobID) } @@ -160,33 +123,6 @@ func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string) (s return result.PresignedS3URL, nil } -// Function to convert *backendconfig.Connection to ConnectionConfig using marshal and unmarshal -func (u *ClevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) { - if conn == nil { - return nil, fmt.Errorf("connection is nil") - } - - // Marshal the backendconfig.Connection to JSON - data, err := json.Marshal(conn) - if err != nil { - return nil, fmt.Errorf("failed to marshal connection: %w", err) - } - - // Unmarshal the JSON into ConnectionConfig - var connConfig ConnectionConfig - err = json.Unmarshal(data, &connConfig) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal to ConnectionConfig: %w", err) - } - - // Set default SenderName if it is empty - if connConfig.Config.Destination.SenderName == "" { - connConfig.Config.Destination.SenderName = DEFAULT_SENDER_NAME - } - - return &connConfig, nil -} - func (u *ClevertapBulkUploader) namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error { url := u.notifyEndpoint @@ -283,7 +219,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS AbortCount: len(asyncDestStruct.ImportingJobIDs), DestinationID: asyncDestStruct.Destination.ID, AbortJobIDs: asyncDestStruct.ImportingJobIDs, - AbortReason: fmt.Sprintf("%s %v", "Error while fetching presigned url", err.Error()), + AbortReason: fmt.Sprintf("%s %v", "Error while fetching presigned url", urlErr), } } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go index 730f806e1e..69f24472d9 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go @@ -161,6 +161,57 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { Expect(received).To(Equal(expected)) }) + It("TestClevertapErrorWhilePreSignedURLFetch", func() { + initClevertap() + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + + clevertapService := mocks.NewMockClevertapService(ctrl) + clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + ConnectionConfig: &ClevertapSegment.ConnectionConfig{ + SourceId: "source123", + DestinationId: "destination456", + Enabled: true, + Config: ClevertapSegment.ConnConfig{ + Destination: ClevertapSegment.Destination{ + SchemaVersion: "v1.0", + SegmentName: "User Segment A", + AdminEmail: "admin@example.com", + SenderName: "Rudderstack", + }, + }, + }, + } + + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + + asyncDestination := common.AsyncDestinationStruct{ + ImportingJobIDs: []int64{1, 2, 3, 4}, + FailedJobIDs: []int64{}, + FileName: filepath.Join(currentDir, "testdata/uploadData.txt"), + Destination: &destination, + } + + // Mock handling for MakeHTTPRequest + clevertapService.EXPECT(). + MakeHTTPRequest(gomock.Any()). + Return([]byte(`{"error": "Invalid Credentials", "status": "fail", "code": 401}`), 401, nil). + Times(1) + + expected := common.AsyncUploadOutput{ + AbortReason: "Error while fetching presigned url Error while fetching preSignedUrl: Invalid Credentials", + ImportingJobIDs: nil, + AbortJobIDs: []int64{1, 2, 3, 4}, + ImportingCount: 0, + AbortCount: 4, + } + + received := bulkUploader.Upload(&asyncDestination) + Expect(received).To(Equal(expected)) + }) + It("TestSuccessfulClevertapUpload", func() { initClevertap() ctrl := gomock.NewController(GinkgoT()) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go index d07063c3b2..a04c45043e 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go @@ -3,15 +3,18 @@ package clevertapSegment import ( "bufio" "encoding/csv" + "encoding/json" "fmt" "os" "path/filepath" + "strings" "github.com/google/uuid" jsoniter "github.com/json-iterator/go" "github.com/rudderlabs/rudder-go-kit/stats" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -153,3 +156,65 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF return actionFile, nil } + +// GetCleverTapEndpoint returns the API endpoint for the given region +func (u *ClevertapServiceImpl) getCleverTapEndpoint(region string) (string, error) { + // Mapping of regions to endpoints + endpoints := map[string]string{ + "IN": "in1.api.clevertap.com", + "SINGAPORE": "sg1.api.clevertap.com", + "US": "us1.api.clevertap.com", + "INDONESIA": "aps3.api.clevertap.com", + "UAE": "mec1.api.clevertap.com", + "EU": "api.clevertap.com", + } + + // Normalize the region input to uppercase for case-insensitivity + region = strings.ToUpper(region) + + // Check if the region exists in the map + if endpoint, exists := endpoints[region]; exists { + return endpoint, nil + } + + // Return an error if the region is not recognized + return "", fmt.Errorf("unknown region: %s", region) +} + +func (u *ClevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *ClevertapServiceImpl { + endpoint, err := u.getCleverTapEndpoint(destConfig.Region) + if err != nil { + return nil + } + return &ClevertapServiceImpl{ + BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), + NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), + } +} + +// Function to convert *backendconfig.Connection to ConnectionConfig using marshal and unmarshal +func (u *ClevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) { + if conn == nil { + return nil, fmt.Errorf("connection is nil") + } + + // Marshal the backendconfig.Connection to JSON + data, err := json.Marshal(conn) + if err != nil { + return nil, fmt.Errorf("failed to marshal connection: %w", err) + } + + // Unmarshal the JSON into ConnectionConfig + var connConfig ConnectionConfig + err = json.Unmarshal(data, &connConfig) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal to ConnectionConfig: %w", err) + } + + // Set default SenderName if it is empty + if connConfig.Config.Destination.SenderName == "" { + connConfig.Config.Destination.SenderName = DEFAULT_SENDER_NAME + } + + return &connConfig, nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go new file mode 100644 index 0000000000..a8fc7e9687 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go @@ -0,0 +1,89 @@ +package clevertapSegment + +import ( + "testing" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" +) + +func TestGetCleverTapEndpoint(t *testing.T) { + tests := []struct { + region string + expectErr bool + expectURL string + }{ + {"IN", false, "in1.api.clevertap.com"}, + {"SINGAPORE", false, "sg1.api.clevertap.com"}, + {"US", false, "us1.api.clevertap.com"}, + {"UNKNOWN", true, ""}, + } + + for _, test := range tests { + t.Run(test.region, func(t *testing.T) { + service := &ClevertapServiceImpl{} + endpoint, err := service.getCleverTapEndpoint(test.region) + + if test.expectErr && err == nil { + t.Errorf("expected an error for region %s, got none", test.region) + } + if !test.expectErr && err != nil { + t.Errorf("did not expect an error for region %s, got: %v", test.region, err) + } + if endpoint != test.expectURL { + t.Errorf("expected URL %s, got %s", test.expectURL, endpoint) + } + }) + } +} + +func TestGetBulkApi(t *testing.T) { + service := &ClevertapServiceImpl{} + destConfig := DestinationConfig{Region: "IN"} + + bulkApi := service.getBulkApi(destConfig) + if bulkApi == nil { + t.Fatal("expected a non-nil bulk API service") + } + if bulkApi.BulkApi == "" { + t.Error("expected a non-empty BulkApi URL") + } +} + +func TestConvertToConnectionConfig(t *testing.T) { + tests := []struct { + conn *backendconfig.Connection + expectErr bool + }{ + {nil, true}, + {&backendconfig.Connection{ + SourceID: "source123", + DestinationID: "destination456", + Enabled: true, + Config: map[string]interface{}{ + "Destination": map[string]interface{}{ + "SchemaVersion": "v1.0", + "SegmentName": "User Segment A", + "AdminEmail": "admin@example.com", + "SenderName": "Rudderstack", + }, + }, + }, false}, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + service := &ClevertapServiceImpl{} + connConfig, err := service.convertToConnectionConfig(test.conn) + + if test.expectErr && err == nil { + t.Error("expected an error, got none") + } + if !test.expectErr && err != nil { + t.Errorf("did not expect an error, got: %v", err) + } + if !test.expectErr && connConfig.Config.Destination.SenderName != DEFAULT_SENDER_NAME { + t.Errorf("expected SenderName to be set to default, got: %s", connConfig.Config.Destination.SenderName) + } + }) + } +} From 5b71fe9b9905083ff4c41db1fe0381bfa994fce2 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Wed, 11 Dec 2024 13:27:01 +0530 Subject: [PATCH 07/10] chore: refactor --- .../clevertap_segment/clevertap_test.go | 39 +- .../clevertap_segment/manager.go | 43 ++- .../clevertap_segment/service.go | 73 ++++ .../clevertap_segment/types.go | 23 +- .../{clevertapSegment.go => upload.go} | 358 +++++++++++------- .../clevertap_segment/utils.go | 179 ++------- .../clevertap_segment/utils_test.go | 115 +++--- 7 files changed, 459 insertions(+), 371 deletions(-) create mode 100644 router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go rename router/batchrouter/asyncdestinationmanager/clevertap_segment/{clevertapSegment.go => upload.go} (56%) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go index 69f24472d9..9bbe2a8c02 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertap_test.go @@ -65,6 +65,10 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { initClevertap() ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } clevertapService := mocks.NewMockClevertapService(ctrl) clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ @@ -84,7 +88,7 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { }, }, } - bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} asyncDestination := common.AsyncDestinationStruct{ ImportingJobIDs: []int64{1, 2, 3, 4}, FailedJobIDs: []int64{}, @@ -110,6 +114,10 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { defer ctrl.Finish() clevertapService := mocks.NewMockClevertapService(ctrl) + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", @@ -128,7 +136,7 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { }, } - bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} asyncDestination := common.AsyncDestinationStruct{ ImportingJobIDs: []int64{1, 2, 3, 4}, @@ -185,7 +193,12 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { }, } - bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} asyncDestination := common.AsyncDestinationStruct{ ImportingJobIDs: []int64{1, 2, 3, 4}, @@ -235,7 +248,11 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { }, }, } - bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), endpoints, clevertapService, clevertapServiceImpl.ConnectionConfig)} asyncDestination := common.AsyncDestinationStruct{ ImportingJobIDs: []int64{1, 2, 3, 4}, FailedJobIDs: []int64{}, @@ -277,6 +294,10 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { defer ctrl.Finish() clevertapService := mocks.NewMockClevertapService(ctrl) + endpoints := &ClevertapSegment.Endpoints{ + BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", + NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", + } clevertapServiceImpl := ClevertapSegment.ClevertapServiceImpl{ BulkApi: "https://api.clevertap.com/get_custom_list_segment_url", NotifyApi: "https://api.clevertap.com/upload_custom_list_segment_completed", @@ -294,7 +315,15 @@ var _ = Describe("CLEVERTAP_SEGMENT test", func() { }, }, } - bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, "CLEVERTAP_SEGMENT", destination.Config["clevertapAccountKey"].(string), destination.Config["clevertapAccountId"].(string), &clevertapServiceImpl, clevertapService, clevertapServiceImpl.ConnectionConfig)} + clevertapBulkUploader := ClevertapSegment.NewClevertapBulkUploader(logger.NOP, stats.NOP, + "CLEVERTAP_SEGMENT", + destination.Config["clevertapAccountKey"].(string), + destination.Config["clevertapAccountId"].(string), + endpoints, + clevertapService, + clevertapServiceImpl.ConnectionConfig, + ) + bulkUploader := common.SimpleAsyncDestinationManager{UploaderAndTransformer: clevertapBulkUploader} asyncDestination := common.AsyncDestinationStruct{ ImportingJobIDs: []int64{1, 2, 3, 4}, FailedJobIDs: []int64{}, diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go index c95cab1b36..c63524360d 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go @@ -1,9 +1,10 @@ package clevertapSegment import ( - "encoding/json" "fmt" + jsoniter "github.com/json-iterator/go" + "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -12,7 +13,18 @@ import ( "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) -func NewClevertapBulkUploader(logger logger.Logger, statsFactory stats.Stats, destinationName, accessToken, appKey string, clevertapEndpoints *ClevertapServiceImpl, clevertap ClevertapService, connectionConfig *ConnectionConfig) common.AsyncUploadAndTransformManager { +var jsonFast = jsoniter.ConfigCompatibleWithStandardLibrary + +func NewClevertapBulkUploader( + logger logger.Logger, + statsFactory stats.Stats, + destinationName, + accessToken, + appKey string, + clevertapEndpoints *Endpoints, + clevertap ClevertapService, + connectionConfig *ConnectionConfig, +) common.AsyncUploadAndTransformManager { return &ClevertapBulkUploader{ destName: destinationName, logger: logger.Child("Clevertap").Child("ClevertapBulkUploader"), @@ -29,25 +41,34 @@ func NewClevertapBulkUploader(logger logger.Logger, statsFactory stats.Stats, de } func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT, connection *backendconfig.Connection) (common.AsyncDestinationManager, error) { - destConfig := DestinationConfig{} - jsonConfig, err := json.Marshal(destination.Config) + var destConfig DestinationConfig + destConfig, err := Convert[map[string]interface{}, DestinationConfig](destination.Config) if err != nil { - return nil, fmt.Errorf("error in marshalling destination config: %v", err) + return nil, fmt.Errorf("error in converting destination config: %v", err) } - err = json.Unmarshal(jsonConfig, &destConfig) + + clevertapConnectionConfig, err := ConvertToConnectionConfig(connection) if err != nil { - return nil, fmt.Errorf("error in unmarshalling destination config: %v", err) + return nil, fmt.Errorf("error converting to connection config for clevertap segment: %v", err) } destName := destination.DestinationDefinition.Name clevertapService := &ClevertapServiceImpl{} - clevertapImpl := clevertapService.getBulkApi(destConfig) - clevertapConnectionConfig, err := clevertapService.convertToConnectionConfig(connection) + endpoints, err := GetBulkApi(destConfig) if err != nil { - return nil, fmt.Errorf("error converting to connection config for clevertap segment: %v", err) + return nil, fmt.Errorf("error getting bulk api for clevertap segment: %v", err) } return common.SimpleAsyncDestinationManager{ - UploaderAndTransformer: NewClevertapBulkUploader(logger, statsFactory, destName, destConfig.AccessToken, destConfig.AppKey, clevertapImpl, clevertapService, clevertapConnectionConfig), + UploaderAndTransformer: NewClevertapBulkUploader( + logger, + statsFactory, + destName, + destConfig.AccessToken, + destConfig.AppKey, + endpoints, + clevertapService, + &clevertapConnectionConfig, + ), }, nil } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go new file mode 100644 index 0000000000..4ed1354c2f --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/service.go @@ -0,0 +1,73 @@ +package clevertapSegment + +import ( + "fmt" + "io" + "net/http" + "os" +) + +type ClevertapServiceImpl struct { + BulkApi string + NotifyApi string + ConnectionConfig *ConnectionConfig +} + +func (u *ClevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) { + req, err := http.NewRequest(data.Method, data.Endpoint, data.Body) + if err != nil { + return nil, 500, err + } + req.Header.Add("X-CleverTap-Account-Id", data.appKey) + req.Header.Add("X-CleverTap-Passcode", data.accessToken) + req.Header.Add("content-type", data.ContentType) + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + return nil, 500, err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, 500, err + } + return body, res.StatusCode, err +} + +func (u *ClevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) error { + // Open the file + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Get the file information + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat file: %w", err) + } + fileSize := fileInfo.Size() + + // Create the PUT request + req, err := http.NewRequest("PUT", presignedURL, file) + if err != nil { + return fmt.Errorf("failed to create PUT request: %w", err) + } + req.ContentLength = fileSize + // Execute the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to upload file: %w", err) + } + defer resp.Body.Close() + + // Check the response status + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("upload failed, status: %s, response: %s", resp.Status, string(body)) + } + return nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go index 83e85f7ad7..174fca62da 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/types.go @@ -7,28 +7,10 @@ import ( "net/http" "os" - "github.com/rudderlabs/rudder-go-kit/stats" - - "github.com/rudderlabs/rudder-go-kit/logger" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) -type ClevertapBulkUploader struct { - destName string - logger logger.Logger - statsFactory stats.Stats - appKey string - accessToken string - presignedURLEndpoint string - notifyEndpoint string - fileSizeLimit int64 - jobToCSVMap map[int64]int64 - service ClevertapService - clevertapConnectionConfig *ConnectionConfig -} - type DestinationConfig struct { AppKey string `json:"appKey"` AccessToken string `json:"accessToken"` @@ -87,6 +69,11 @@ type Destination struct { SenderName string `json:"senderName"` } +type Endpoints struct { + BulkApi string + NotifyApi string +} + type ConnConfig struct { Destination Destination `json:"destination"` } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go similarity index 56% rename from router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go rename to router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go index a9078846a9..23df1866b5 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/clevertapSegment.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go @@ -1,185 +1,171 @@ package clevertapSegment import ( + "bufio" "bytes" - "encoding/json" "fmt" - "io" "net/http" "os" + "path/filepath" "time" - "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/google/uuid" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" + "github.com/rudderlabs/rudder-server/utils/misc" ) -type ClevertapServiceImpl struct { - BulkApi string - NotifyApi string - ConnectionConfig *ConnectionConfig +type ClevertapBulkUploader struct { + destName string + logger logger.Logger + statsFactory stats.Stats + appKey string + accessToken string + presignedURLEndpoint string + notifyEndpoint string + fileSizeLimit int64 + jobToCSVMap map[int64]int64 + service ClevertapService + clevertapConnectionConfig *ConnectionConfig } func (*ClevertapBulkUploader) Transform(job *jobsdb.JobT) (string, error) { return common.GetMarshalledData(string(job.EventPayload), job.JobID) } -func (u *ClevertapServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) { - req, err := http.NewRequest(data.Method, data.Endpoint, data.Body) - if err != nil { - return nil, 500, err - } - req.Header.Add("X-CleverTap-Account-Id", data.appKey) - req.Header.Add("X-CleverTap-Passcode", data.accessToken) - req.Header.Add("content-type", data.ContentType) - client := &http.Client{} - res, err := client.Do(req) - if err != nil { - return nil, 500, err - } - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return nil, 500, err - } - return body, res.StatusCode, err -} - -func (u *ClevertapServiceImpl) UploadBulkFile(filePath, presignedURL string) error { - // Open the file - file, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() +func (u *ClevertapBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error { + newFileSize := actionFile.FileSize + int64(len(line)) + if newFileSize < u.fileSizeLimit { + actionFile.FileSize = newFileSize + actionFile.EventCount += 1 - // Get the file information - fileInfo, err := file.Stat() - if err != nil { - return fmt.Errorf("failed to stat file: %w", err) - } - fileSize := fileInfo.Size() + // Unmarshal Properties into a map of jsonLib.RawMessage + var fields map[string]interface{} + if err := jsonFast.Unmarshal(data.Message.Fields, &fields); err != nil { + return err + } - // Create the PUT request - req, err := http.NewRequest("PUT", presignedURL, file) - if err != nil { - return fmt.Errorf("failed to create PUT request: %w", err) - } - req.ContentLength = fileSize - // Execute the request - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to upload file: %w", err) - } - defer resp.Body.Close() + // Check for presence of "i" and "g" values + if valueG, okG := fields["g"]; okG { + // If "g" exists, prioritize it and omit "i" + csvRow := []string{"g", fmt.Sprintf("%v", valueG)} // Type: g + if err := actionFile.CSVWriter.Write(csvRow); err != nil { + return err + } + } else if valueI, okI := fields["i"]; okI { + // Write "i" value only if "g" does not exist + csvRow := []string{"i", fmt.Sprintf("%v", valueI)} // Type: i + if err := actionFile.CSVWriter.Write(csvRow); err != nil { + return err + } + } - // Check the response status - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("upload failed, status: %s, response: %s", resp.Status, string(body)) + // Write the CSV header only once + if actionFile.EventCount == 1 { + // Fixed headers + headers := []string{"Type", "Identity"} + if err := actionFile.CSVWriter.Write(headers); err != nil { + return err + } + } + actionFile.CSVWriter.Flush() + actionFile.SuccessfulJobIDs = append(actionFile.SuccessfulJobIDs, data.Metadata.JobID) + } else { + // fmt.println("size exceeding") + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) } return nil } -func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string) (string, error) { - data := &HttpRequestData{ - Method: http.MethodPost, - Endpoint: u.presignedURLEndpoint, - ContentType: "application/json", - appKey: appKey, - accessToken: accessToken, - } - - body, statusCode, err := u.service.MakeHTTPRequest(data) +func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionFileInfo, error) { + // Create a temporary directory using misc.CreateTMPDIR + tmpDirPath, err := misc.CreateTMPDIR() if err != nil { - return "", err + return nil, fmt.Errorf("failed to create temporary directory: %v", err) } - // Parse the response - var result struct { - PresignedS3URL string `json:"presignedS3URL"` - Expiry string `json:"expiry"` - Status string `json:"status"` - Error string `json:"error"` - Code int `json:"code"` - } - if err := json.Unmarshal(body, &result); err != nil { - return "", err - } + // Define a local directory name within the temp directory + localTmpDirName := fmt.Sprintf("/%s/", misc.RudderAsyncDestinationLogs) - if statusCode != 200 { - err := fmt.Errorf("Error while fetching preSignedUrl: %s", result.Error) - return "", err - } + // Combine the temporary directory with the local directory name and generate a unique file path + path := filepath.Join(tmpDirPath, localTmpDirName, uuid.NewString()) + csvFilePath := fmt.Sprintf("%v.csv", path) - if result.PresignedS3URL == "" { - err := fmt.Errorf("presigned URL is empty after parsing") - return "", err + // Initialize the CSV writer with the generated file path + actionFile, err := CreateCSVWriter(csvFilePath) + if err != nil { + return nil, err } + defer actionFile.File.Close() // Ensure the file is closed when done - return result.PresignedS3URL, nil -} + // Store the CSV file path in the ActionFileInfo struct + actionFile.CSVFilePath = csvFilePath -func (u *ClevertapBulkUploader) namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error { - url := u.notifyEndpoint - - // Construct the request payload - payload := map[string]interface{}{ - "name": u.clevertapConnectionConfig.Config.Destination.SegmentName, - "email": u.clevertapConnectionConfig.Config.Destination.AdminEmail, - "filename": csvFilePath, - "creator": u.clevertapConnectionConfig.Config.Destination.SenderName, - "url": presignedURL, - "replace": true, - } - - payloadBytes, err := json.Marshal(payload) + // Create a scanner to read the existing file line by line + existingFile, err := os.Open(existingFilePath) if err != nil { - return err + return nil, fmt.Errorf("failed to open existing file") } + defer existingFile.Close() + + scanner := bufio.NewScanner(existingFile) + scanner.Buffer(nil, 50000*1024) // Adjust the buffer size if necessary + + for scanner.Scan() { + line := scanner.Text() + var data Data + if err := jsonFast.Unmarshal([]byte(line), &data); err != nil { + // Collect the failed job ID only if it's valid + if data.Metadata.JobID != 0 { // Check if JobID is not zero + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + continue + } - // Create HttpRequestData - data := &HttpRequestData{ - Method: http.MethodPost, - Endpoint: url, - Body: bytes.NewBuffer(payloadBytes), - ContentType: "application/json", - appKey: appKey, - accessToken: accessToken, - } + // Calculate the payload size and observe it + payloadSizeStat := u.statsFactory.NewTaggedStat("payload_size", stats.HistogramType, + map[string]string{ + "module": "batch_router", + "destType": u.destName, + }) + payloadSizeStat.Observe(float64(len(data.Message.Fields))) - // Use MakeHTTPRequest to send the request - body, statusCode, err := u.service.MakeHTTPRequest(data) - if err != nil { - return err - } - // Parse the response - var result struct { - SegmentID int `json:"Segment ID"` - Status string `json:"status"` - Error string `json:"error"` - Code int `json:"code"` - } - if err := json.Unmarshal(body, &result); err != nil { - return err + // Populate the CSV file and collect success/failure job IDs + err := u.PopulateCsvFile(actionFile, line, data) + if err != nil { + // Collect the failed job ID only if it's valid + if data.Metadata.JobID != 0 { // Check if JobID is not zero + actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) + } + } } - if statusCode != 200 { - err := fmt.Errorf("Error while namimng segment: %s", result.Error) - return err + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error while scanning file: %v", err) } - if result.SegmentID == 0 { - err := fmt.Errorf("Segment Creation is Unsuccessful") - return err + // After processing, calculate the final file size + fileInfo, err := os.Stat(actionFile.CSVFilePath) + if err != nil { + return nil, fmt.Errorf("failed to retrieve file info: %v", err) } + actionFile.FileSize = fileInfo.Size() - return nil + return actionFile, nil } +/* +1. Read CSV file from the path +2. API call to get the presigned URL +3. API call to upload the file to the presigned URL +4. Check if the file is uploaded successfully +5. API call to name the segment +6. Remove the file after uploading +7. Return the success and failed job IDs +*/ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { destination := asyncDestStruct.Destination filePath := asyncDestStruct.FileName @@ -207,7 +193,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS "destType": u.destName, }) - presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken) + presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken) // API if urlErr != nil { eventsAbortedStat := u.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ @@ -224,7 +210,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS } startTime := time.Now() - errorDuringUpload := u.service.UploadBulkFile(actionFiles.CSVFilePath, presignedURL) + errorDuringUpload := u.service.UploadBulkFile(actionFiles.CSVFilePath, presignedURL) // API uploadTimeStat.Since(startTime) if errorDuringUpload != nil { @@ -257,7 +243,7 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS failedJobs = append(failedJobs, actionFiles.FailedJobIDs...) successJobs = append(successJobs, actionFiles.SuccessfulJobIDs...) - errorDuringNaming := u.namingSegment(presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) + errorDuringNaming := u.namingSegment(presignedURL, actionFiles.CSVFilePath, u.appKey, u.accessToken) // API if errorDuringNaming != nil { return common.AsyncUploadOutput{ @@ -281,3 +267,99 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS DestinationID: destination.ID, } } + +func (u *ClevertapBulkUploader) getPresignedS3URL(appKey, accessToken string) (string, error) { + data := &HttpRequestData{ + Method: http.MethodPost, + Endpoint: u.presignedURLEndpoint, + ContentType: "application/json", + appKey: appKey, + accessToken: accessToken, + } + + body, statusCode, err := u.service.MakeHTTPRequest(data) + if err != nil { + return "", err + } + + // Parse the response + var result struct { + PresignedS3URL string `json:"presignedS3URL"` + Expiry string `json:"expiry"` + Status string `json:"status"` + Error string `json:"error"` + Code int `json:"code"` + } + if err := jsonFast.Unmarshal(body, &result); err != nil { + return "", err + } + + if statusCode != 200 { + err := fmt.Errorf("Error while fetching preSignedUrl: %s", result.Error) + return "", err + } + + if result.PresignedS3URL == "" { + err := fmt.Errorf("presigned URL is empty after parsing") + return "", err + } + + return result.PresignedS3URL, nil +} + +func (u *ClevertapBulkUploader) namingSegment(presignedURL, csvFilePath, appKey, accessToken string) error { + url := u.notifyEndpoint + + // Construct the request payload + payload := map[string]interface{}{ + "name": u.clevertapConnectionConfig.Config.Destination.SegmentName, + "email": u.clevertapConnectionConfig.Config.Destination.AdminEmail, + "filename": csvFilePath, + "creator": u.clevertapConnectionConfig.Config.Destination.SenderName, + "url": presignedURL, + "replace": true, + } + + payloadBytes, err := jsonFast.Marshal(payload) + if err != nil { + return err + } + + // Create HttpRequestData + data := &HttpRequestData{ + Method: http.MethodPost, + Endpoint: url, + Body: bytes.NewBuffer(payloadBytes), + ContentType: "application/json", + appKey: appKey, + accessToken: accessToken, + } + + // Use MakeHTTPRequest to send the request + body, statusCode, err := u.service.MakeHTTPRequest(data) + if err != nil { + return err + } + // Parse the response + var result struct { + SegmentID int `json:"Segment ID"` + Status string `json:"status"` + Error string `json:"error"` + Code int `json:"code"` + } + if err := jsonFast.Unmarshal(body, &result); err != nil { + return err + } + + if statusCode != 200 { + err := fmt.Errorf("Error while namimng segment: %s", result.Error) + return err + } + + if result.SegmentID == 0 { + err := fmt.Errorf("Segment Creation is Unsuccessful") + return err + } + + return nil +} diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go index a04c45043e..4797ecbee3 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go @@ -1,68 +1,15 @@ package clevertapSegment import ( - "bufio" "encoding/csv" - "encoding/json" "fmt" "os" - "path/filepath" "strings" - "github.com/google/uuid" - jsoniter "github.com/json-iterator/go" - - "github.com/rudderlabs/rudder-go-kit/stats" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - "github.com/rudderlabs/rudder-server/utils/misc" ) -func (u *ClevertapBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error { - newFileSize := actionFile.FileSize + int64(len(line)) - if newFileSize < u.fileSizeLimit { - actionFile.FileSize = newFileSize - actionFile.EventCount += 1 - - // Unmarshal Properties into a map of json.RawMessage - var fields map[string]interface{} - if err := jsoniter.Unmarshal(data.Message.Fields, &fields); err != nil { - return err - } - - // Check for presence of "i" and "g" values - if valueG, okG := fields["g"]; okG { - // If "g" exists, prioritize it and omit "i" - csvRow := []string{"g", fmt.Sprintf("%v", valueG)} // Type: g - if err := actionFile.CSVWriter.Write(csvRow); err != nil { - return err - } - } else if valueI, okI := fields["i"]; okI { - // Write "i" value only if "g" does not exist - csvRow := []string{"i", fmt.Sprintf("%v", valueI)} // Type: i - if err := actionFile.CSVWriter.Write(csvRow); err != nil { - return err - } - } - - // Write the CSV header only once - if actionFile.EventCount == 1 { - // Fixed headers - headers := []string{"Type", "Identity"} - if err := actionFile.CSVWriter.Write(headers); err != nil { - return err - } - } - actionFile.CSVWriter.Flush() - actionFile.SuccessfulJobIDs = append(actionFile.SuccessfulJobIDs, data.Metadata.JobID) - } else { - // fmt.println("size exceeding") - actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) - } - return nil -} - -func createCSVWriter(fileName string) (*ActionFileInfo, error) { +func CreateCSVWriter(fileName string) (*ActionFileInfo, error) { // Open or create the file where the CSV will be written file, err := os.Create(fileName) if err != nil { @@ -80,85 +27,19 @@ func createCSVWriter(fileName string) (*ActionFileInfo, error) { }, nil } -func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionFileInfo, error) { - // Create a temporary directory using misc.CreateTMPDIR - tmpDirPath, err := misc.CreateTMPDIR() - if err != nil { - return nil, fmt.Errorf("failed to create temporary directory: %v", err) - } - - // Define a local directory name within the temp directory - localTmpDirName := fmt.Sprintf("/%s/", misc.RudderAsyncDestinationLogs) - - // Combine the temporary directory with the local directory name and generate a unique file path - path := filepath.Join(tmpDirPath, localTmpDirName, uuid.NewString()) - csvFilePath := fmt.Sprintf("%v.csv", path) - - // Initialize the CSV writer with the generated file path - actionFile, err := createCSVWriter(csvFilePath) +func GetBulkApi(destConfig DestinationConfig) (*Endpoints, error) { + endpoint, err := GetCleverTapEndpoint(destConfig.Region) if err != nil { return nil, err } - defer actionFile.File.Close() // Ensure the file is closed when done - - // Store the CSV file path in the ActionFileInfo struct - actionFile.CSVFilePath = csvFilePath - - // Create a scanner to read the existing file line by line - existingFile, err := os.Open(existingFilePath) - if err != nil { - return nil, fmt.Errorf("failed to open existing file") - } - defer existingFile.Close() - - scanner := bufio.NewScanner(existingFile) - scanner.Buffer(nil, 50000*1024) // Adjust the buffer size if necessary - - for scanner.Scan() { - line := scanner.Text() - var data Data - if err := jsoniter.Unmarshal([]byte(line), &data); err != nil { - // Collect the failed job ID only if it's valid - if data.Metadata.JobID != 0 { // Check if JobID is not zero - actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) - } - continue - } - - // Calculate the payload size and observe it - payloadSizeStat := u.statsFactory.NewTaggedStat("payload_size", stats.HistogramType, - map[string]string{ - "module": "batch_router", - "destType": u.destName, - }) - payloadSizeStat.Observe(float64(len(data.Message.Fields))) - - // Populate the CSV file and collect success/failure job IDs - err := u.PopulateCsvFile(actionFile, line, data) - if err != nil { - // Collect the failed job ID only if it's valid - if data.Metadata.JobID != 0 { // Check if JobID is not zero - actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) - } - } - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error while scanning file: %v", err) - } - - // After processing, calculate the final file size - fileInfo, err := os.Stat(actionFile.CSVFilePath) - if err != nil { - return nil, fmt.Errorf("failed to retrieve file info: %v", err) - } - actionFile.FileSize = fileInfo.Size() - - return actionFile, nil + return &Endpoints{ + BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), + NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), + }, nil } // GetCleverTapEndpoint returns the API endpoint for the given region -func (u *ClevertapServiceImpl) getCleverTapEndpoint(region string) (string, error) { +func GetCleverTapEndpoint(region string) (string, error) { // Mapping of regions to endpoints endpoints := map[string]string{ "IN": "in1.api.clevertap.com", @@ -181,40 +62,36 @@ func (u *ClevertapServiceImpl) getCleverTapEndpoint(region string) (string, erro return "", fmt.Errorf("unknown region: %s", region) } -func (u *ClevertapServiceImpl) getBulkApi(destConfig DestinationConfig) *ClevertapServiceImpl { - endpoint, err := u.getCleverTapEndpoint(destConfig.Region) - if err != nil { - return nil - } - return &ClevertapServiceImpl{ - BulkApi: fmt.Sprintf("https://%s/get_custom_list_segment_url", endpoint), - NotifyApi: fmt.Sprintf("https://%s/upload_custom_list_segment_completed", endpoint), - } +type ConfigOutput interface { + ConnectionConfig | DestinationConfig } -// Function to convert *backendconfig.Connection to ConnectionConfig using marshal and unmarshal -func (u *ClevertapServiceImpl) convertToConnectionConfig(conn *backendconfig.Connection) (*ConnectionConfig, error) { - if conn == nil { - return nil, fmt.Errorf("connection is nil") - } +// Generic function to convert input to output using JSON marshal/unmarshal +func Convert[T any, U ConfigOutput](input T) (U, error) { + var output U - // Marshal the backendconfig.Connection to JSON - data, err := json.Marshal(conn) + // Marshal the input to JSON + data, err := jsonFast.Marshal(input) if err != nil { - return nil, fmt.Errorf("failed to marshal connection: %w", err) + return output, fmt.Errorf("failed to marshal input: %w", err) } - // Unmarshal the JSON into ConnectionConfig - var connConfig ConnectionConfig - err = json.Unmarshal(data, &connConfig) + // Unmarshal the JSON into the output type + err = jsonFast.Unmarshal(data, &output) if err != nil { - return nil, fmt.Errorf("failed to unmarshal to ConnectionConfig: %w", err) + return output, fmt.Errorf("failed to unmarshal to output type: %w", err) } - // Set default SenderName if it is empty + return output, nil +} + +func ConvertToConnectionConfig(conn *backendconfig.Connection) (ConnectionConfig, error) { + connConfig, err := Convert[map[string]interface{}, ConnectionConfig](conn.Config) + if err != nil { + return ConnectionConfig{}, fmt.Errorf("failed to convert to connection config: %w", err) + } if connConfig.Config.Destination.SenderName == "" { connConfig.Config.Destination.SenderName = DEFAULT_SENDER_NAME } - - return &connConfig, nil + return connConfig, nil } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go index a8fc7e9687..38cdd473f7 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go @@ -3,6 +3,8 @@ package clevertapSegment import ( "testing" + "github.com/stretchr/testify/assert" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" ) @@ -12,77 +14,94 @@ func TestGetCleverTapEndpoint(t *testing.T) { expectErr bool expectURL string }{ - {"IN", false, "in1.api.clevertap.com"}, - {"SINGAPORE", false, "sg1.api.clevertap.com"}, - {"US", false, "us1.api.clevertap.com"}, - {"UNKNOWN", true, ""}, + {region: "IN", expectURL: "in1.api.clevertap.com"}, + {region: "SINGAPORE", expectURL: "sg1.api.clevertap.com"}, + {region: "US", expectURL: "us1.api.clevertap.com"}, + {region: "UNKNOWN", expectErr: true, expectURL: ""}, } - for _, test := range tests { - t.Run(test.region, func(t *testing.T) { - service := &ClevertapServiceImpl{} - endpoint, err := service.getCleverTapEndpoint(test.region) + for _, tc := range tests { + t.Run(tc.region, func(t *testing.T) { + endpoint, err := GetCleverTapEndpoint(tc.region) - if test.expectErr && err == nil { - t.Errorf("expected an error for region %s, got none", test.region) - } - if !test.expectErr && err != nil { - t.Errorf("did not expect an error for region %s, got: %v", test.region, err) - } - if endpoint != test.expectURL { - t.Errorf("expected URL %s, got %s", test.expectURL, endpoint) + if tc.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) } + assert.Equal(t, endpoint, tc.expectURL) }) } } func TestGetBulkApi(t *testing.T) { - service := &ClevertapServiceImpl{} destConfig := DestinationConfig{Region: "IN"} + assert.Equal(t, "IN", destConfig.Region) - bulkApi := service.getBulkApi(destConfig) - if bulkApi == nil { - t.Fatal("expected a non-nil bulk API service") - } - if bulkApi.BulkApi == "" { - t.Error("expected a non-empty BulkApi URL") - } + endpoints, err := GetBulkApi(destConfig) + assert.Nil(t, err) + assert.NotNil(t, endpoints) + assert.NotEmpty(t, endpoints.BulkApi) + assert.NotEmpty(t, endpoints.NotifyApi) + // TODO: Validate endpoints.BulkApi and endpoints.NotifyApi } func TestConvertToConnectionConfig(t *testing.T) { + type expected struct { + senderName string + isErr bool + } tests := []struct { - conn *backendconfig.Connection - expectErr bool + conn *backendconfig.Connection + expected expected }{ - {nil, true}, - {&backendconfig.Connection{ - SourceID: "source123", - DestinationID: "destination456", - Enabled: true, - Config: map[string]interface{}{ - "Destination": map[string]interface{}{ - "SchemaVersion": "v1.0", - "SegmentName": "User Segment A", - "AdminEmail": "admin@example.com", - "SenderName": "Rudderstack", + { + conn: &backendconfig.Connection{ + SourceID: "source123", + DestinationID: "destination456", + Enabled: true, + Config: nil, + }, + expected: expected{ + isErr: true, + }, + }, + { + conn: &backendconfig.Connection{ + SourceID: "source123", + DestinationID: "destination456", + Enabled: true, + Config: map[string]interface{}{ + "Destination": map[string]interface{}{ + "SchemaVersion": "v1.0", + "SegmentName": "User Segment A", + "AdminEmail": "admin@example.com", + "SenderName": "Rudderstack", + }, }, }, - }, false}, + expected: expected{ + senderName: "Rudderstack", + isErr: false, + }, + }, } - for _, test := range tests { + for _, tc := range tests { t.Run("", func(t *testing.T) { - service := &ClevertapServiceImpl{} - connConfig, err := service.convertToConnectionConfig(test.conn) + connConfig, err := ConvertToConnectionConfig(tc.conn) - if test.expectErr && err == nil { - t.Error("expected an error, got none") - } - if !test.expectErr && err != nil { - t.Errorf("did not expect an error, got: %v", err) + if tc.expected.isErr { + assert.Error(t, err) + assert.Equal(t, "", connConfig.Config.Destination.SenderName) + return } - if !test.expectErr && connConfig.Config.Destination.SenderName != DEFAULT_SENDER_NAME { - t.Errorf("expected SenderName to be set to default, got: %s", connConfig.Config.Destination.SenderName) + assert.NoError(t, err) + + if connConfig.Config.Destination.SenderName == "" { + assert.Equal(t, DEFAULT_SENDER_NAME, connConfig.Config.Destination.SenderName) + } else { + assert.Equal(t, tc.expected.senderName, connConfig.Config.Destination.SenderName) } }) } From c98f393100e3557274a09f0e75b361dd84d0fb70 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Wed, 11 Dec 2024 13:37:55 +0530 Subject: [PATCH 08/10] chore: naming changes - 1 - modify error test case while converting connection config --- .../clevertap_segment/manager.go | 6 +++--- .../clevertap_segment/upload.go | 7 +++---- .../clevertap_segment/utils.go | 16 ++++++++-------- .../clevertap_segment/utils_test.go | 13 +++++++------ 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go index c63524360d..dd2cf23ab1 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/manager.go @@ -42,19 +42,19 @@ func NewClevertapBulkUploader( func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT, connection *backendconfig.Connection) (common.AsyncDestinationManager, error) { var destConfig DestinationConfig - destConfig, err := Convert[map[string]interface{}, DestinationConfig](destination.Config) + destConfig, err := convert[map[string]interface{}, DestinationConfig](destination.Config) if err != nil { return nil, fmt.Errorf("error in converting destination config: %v", err) } - clevertapConnectionConfig, err := ConvertToConnectionConfig(connection) + clevertapConnectionConfig, err := convertToConnectionConfig(connection) if err != nil { return nil, fmt.Errorf("error converting to connection config for clevertap segment: %v", err) } destName := destination.DestinationDefinition.Name clevertapService := &ClevertapServiceImpl{} - endpoints, err := GetBulkApi(destConfig) + endpoints, err := getBulkApi(destConfig) if err != nil { return nil, fmt.Errorf("error getting bulk api for clevertap segment: %v", err) } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go index 23df1866b5..7c68129673 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go @@ -36,7 +36,7 @@ func (*ClevertapBulkUploader) Transform(job *jobsdb.JobT) (string, error) { return common.GetMarshalledData(string(job.EventPayload), job.JobID) } -func (u *ClevertapBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, line string, data Data) error { +func (u *ClevertapBulkUploader) populateCsvFile(actionFile *ActionFileInfo, line string, data Data) error { newFileSize := actionFile.FileSize + int64(len(line)) if newFileSize < u.fileSizeLimit { actionFile.FileSize = newFileSize @@ -74,7 +74,6 @@ func (u *ClevertapBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, line actionFile.CSVWriter.Flush() actionFile.SuccessfulJobIDs = append(actionFile.SuccessfulJobIDs, data.Metadata.JobID) } else { - // fmt.println("size exceeding") actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) } return nil @@ -95,7 +94,7 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF csvFilePath := fmt.Sprintf("%v.csv", path) // Initialize the CSV writer with the generated file path - actionFile, err := CreateCSVWriter(csvFilePath) + actionFile, err := createCSVWriter(csvFilePath) if err != nil { return nil, err } @@ -134,7 +133,7 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF payloadSizeStat.Observe(float64(len(data.Message.Fields))) // Populate the CSV file and collect success/failure job IDs - err := u.PopulateCsvFile(actionFile, line, data) + err := u.populateCsvFile(actionFile, line, data) if err != nil { // Collect the failed job ID only if it's valid if data.Metadata.JobID != 0 { // Check if JobID is not zero diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go index 4797ecbee3..9a74ec70fe 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils.go @@ -9,7 +9,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" ) -func CreateCSVWriter(fileName string) (*ActionFileInfo, error) { +func createCSVWriter(fileName string) (*ActionFileInfo, error) { // Open or create the file where the CSV will be written file, err := os.Create(fileName) if err != nil { @@ -27,8 +27,8 @@ func CreateCSVWriter(fileName string) (*ActionFileInfo, error) { }, nil } -func GetBulkApi(destConfig DestinationConfig) (*Endpoints, error) { - endpoint, err := GetCleverTapEndpoint(destConfig.Region) +func getBulkApi(destConfig DestinationConfig) (*Endpoints, error) { + endpoint, err := getCleverTapEndpoint(destConfig.Region) if err != nil { return nil, err } @@ -38,8 +38,8 @@ func GetBulkApi(destConfig DestinationConfig) (*Endpoints, error) { }, nil } -// GetCleverTapEndpoint returns the API endpoint for the given region -func GetCleverTapEndpoint(region string) (string, error) { +// getCleverTapEndpoint returns the API endpoint for the given region +func getCleverTapEndpoint(region string) (string, error) { // Mapping of regions to endpoints endpoints := map[string]string{ "IN": "in1.api.clevertap.com", @@ -67,7 +67,7 @@ type ConfigOutput interface { } // Generic function to convert input to output using JSON marshal/unmarshal -func Convert[T any, U ConfigOutput](input T) (U, error) { +func convert[T any, U ConfigOutput](input T) (U, error) { var output U // Marshal the input to JSON @@ -85,8 +85,8 @@ func Convert[T any, U ConfigOutput](input T) (U, error) { return output, nil } -func ConvertToConnectionConfig(conn *backendconfig.Connection) (ConnectionConfig, error) { - connConfig, err := Convert[map[string]interface{}, ConnectionConfig](conn.Config) +func convertToConnectionConfig(conn *backendconfig.Connection) (ConnectionConfig, error) { + connConfig, err := convert[map[string]interface{}, ConnectionConfig](conn.Config) if err != nil { return ConnectionConfig{}, fmt.Errorf("failed to convert to connection config: %w", err) } diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go index 38cdd473f7..bceea4bd5a 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go @@ -22,7 +22,7 @@ func TestGetCleverTapEndpoint(t *testing.T) { for _, tc := range tests { t.Run(tc.region, func(t *testing.T) { - endpoint, err := GetCleverTapEndpoint(tc.region) + endpoint, err := getCleverTapEndpoint(tc.region) if tc.expectErr { assert.Error(t, err) @@ -38,7 +38,7 @@ func TestGetBulkApi(t *testing.T) { destConfig := DestinationConfig{Region: "IN"} assert.Equal(t, "IN", destConfig.Region) - endpoints, err := GetBulkApi(destConfig) + endpoints, err := getBulkApi(destConfig) assert.Nil(t, err) assert.NotNil(t, endpoints) assert.NotEmpty(t, endpoints.BulkApi) @@ -60,7 +60,9 @@ func TestConvertToConnectionConfig(t *testing.T) { SourceID: "source123", DestinationID: "destination456", Enabled: true, - Config: nil, + Config: map[string]interface{}{ + "invalidKey": make(chan int), // Channels cannot be marshaled to JSON + }, }, expected: expected{ isErr: true, @@ -82,22 +84,21 @@ func TestConvertToConnectionConfig(t *testing.T) { }, expected: expected{ senderName: "Rudderstack", - isErr: false, }, }, } for _, tc := range tests { t.Run("", func(t *testing.T) { - connConfig, err := ConvertToConnectionConfig(tc.conn) + connConfig, err := convertToConnectionConfig(tc.conn) if tc.expected.isErr { assert.Error(t, err) assert.Equal(t, "", connConfig.Config.Destination.SenderName) return } - assert.NoError(t, err) + assert.NoError(t, err) if connConfig.Config.Destination.SenderName == "" { assert.Equal(t, DEFAULT_SENDER_NAME, connConfig.Config.Destination.SenderName) } else { From db5884d97f7f90ca95814e3433a3b3430ecbcda4 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Wed, 11 Dec 2024 13:52:08 +0530 Subject: [PATCH 09/10] chore: input name update --- .../clevertap_segment/upload.go | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go index 7c68129673..ef9f72de95 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go @@ -79,7 +79,7 @@ func (u *ClevertapBulkUploader) populateCsvFile(actionFile *ActionFileInfo, line return nil } -func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionFileInfo, error) { +func (u *ClevertapBulkUploader) createCSVFile(inputDataFilePath string) (*ActionFileInfo, error) { // Create a temporary directory using misc.CreateTMPDIR tmpDirPath, err := misc.CreateTMPDIR() if err != nil { @@ -100,17 +100,17 @@ func (u *ClevertapBulkUploader) createCSVFile(existingFilePath string) (*ActionF } defer actionFile.File.Close() // Ensure the file is closed when done - // Store the CSV file path in the ActionFileInfo struct - actionFile.CSVFilePath = csvFilePath + // // Store the CSV file path in the ActionFileInfo struct + // actionFile.CSVFilePath = csvFilePath // Create a scanner to read the existing file line by line - existingFile, err := os.Open(existingFilePath) + inputFile, err := os.Open(inputDataFilePath) if err != nil { return nil, fmt.Errorf("failed to open existing file") } - defer existingFile.Close() + defer inputFile.Close() - scanner := bufio.NewScanner(existingFile) + scanner := bufio.NewScanner(inputFile) scanner.Buffer(nil, 50000*1024) // Adjust the buffer size if necessary for scanner.Scan() { @@ -180,24 +180,27 @@ func (u *ClevertapBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS DestinationID: destination.ID, } } - uploadRetryableStat := u.statsFactory.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{ - "module": "batch_router", - "destType": u.destName, + eventsOverLimit := u.statsFactory.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{ + "module": "batch_router", + "destType": u.destName, + "destinationID": destination.ID, }) - uploadRetryableStat.Count(len(actionFiles.FailedJobIDs)) + eventsOverLimit.Count(len(actionFiles.FailedJobIDs)) uploadTimeStat := u.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{ - "module": "batch_router", - "destType": u.destName, + "module": "batch_router", + "destType": u.destName, + "destinationID": destination.ID, }) presignedURL, urlErr := u.getPresignedS3URL(u.appKey, u.accessToken) // API if urlErr != nil { eventsAbortedStat := u.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ - "module": "batch_router", - "destType": u.destName, + "module": "batch_router", + "destType": u.destName, + "destinationID": destination.ID, }) eventsAbortedStat.Count(len(asyncDestStruct.ImportingJobIDs)) return common.AsyncUploadOutput{ From 2170f58c0275c85a703fef7031328072174870a9 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Wed, 11 Dec 2024 14:34:46 +0530 Subject: [PATCH 10/10] fix: adding minor test cases --- .../asyncdestinationmanager/clevertap_segment/upload.go | 2 +- .../asyncdestinationmanager/clevertap_segment/utils_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go index ef9f72de95..2af23b80c5 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/upload.go @@ -111,7 +111,7 @@ func (u *ClevertapBulkUploader) createCSVFile(inputDataFilePath string) (*Action defer inputFile.Close() scanner := bufio.NewScanner(inputFile) - scanner.Buffer(nil, 50000*1024) // Adjust the buffer size if necessary + scanner.Buffer(nil, 50000*1024) // maximum size of a single line that the scanner can read. Adjust the buffer size if necessary for scanner.Scan() { line := scanner.Text() diff --git a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go index bceea4bd5a..534681acfb 100644 --- a/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go +++ b/router/batchrouter/asyncdestinationmanager/clevertap_segment/utils_test.go @@ -43,7 +43,8 @@ func TestGetBulkApi(t *testing.T) { assert.NotNil(t, endpoints) assert.NotEmpty(t, endpoints.BulkApi) assert.NotEmpty(t, endpoints.NotifyApi) - // TODO: Validate endpoints.BulkApi and endpoints.NotifyApi + assert.Equal(t, "https://in1.api.clevertap.com/get_custom_list_segment_url", endpoints.BulkApi) + assert.Equal(t, "https://in1.api.clevertap.com/upload_custom_list_segment_completed", endpoints.NotifyApi) } func TestConvertToConnectionConfig(t *testing.T) {