diff --git a/enterprise/reporting/error_reporting.go b/enterprise/reporting/error_reporting.go index 28f7e3743c..6a4c005443 100644 --- a/enterprise/reporting/error_reporting.go +++ b/enterprise/reporting/error_reporting.go @@ -28,7 +28,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/stats/collectors" obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - + "github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" "github.com/rudderlabs/rudder-server/utils/httputil" . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck @@ -87,16 +87,14 @@ type ErrorDetailReporter struct { minReportedAtQueryTime stats.Measurement errorDetailReportsQueryTime stats.Measurement edReportingRequestLatency stats.Measurement + eventSamplingEnabled config.ValueLoader[bool] + eventSamplingDuration config.ValueLoader[time.Duration] + eventSampler event_sampler.EventSampler stats stats.Stats config *config.Config } -type errorDetails struct { - ErrorCode string - ErrorMessage string -} - func NewErrorDetailReporter( ctx context.Context, configSubscriber *configSubscriber, @@ -112,11 +110,26 @@ func NewErrorDetailReporter( sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval") maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests") maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections") + eventSamplingEnabled := conf.GetReloadableBoolVar(false, "Reporting.errorReporting.eventSampling.enabled") + eventSamplingDuration := conf.GetReloadableDurationVar(60, time.Minute, "Reporting.eventSampling.durationInMinutes") + eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type") + eventSamplingCardinality := conf.GetReloadableIntVar(100000, 1, "Reporting.eventSampling.cardinality") log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting") extractor := NewErrorDetailExtractor(log) ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) + + var eventSampler event_sampler.EventSampler + + if eventSamplingEnabled.Load() { + var err error + eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log) + if err != nil { + panic(err) + } + } + return &ErrorDetailReporter{ ctx: ctx, cancel: cancel, @@ -129,6 +142,10 @@ func NewErrorDetailReporter( vacuumFull: conf.GetReloadableBoolVar(true, "Reporting.errorReporting.vacuumFull", "Reporting.vacuumFull"), httpClient: netClient, + eventSamplingEnabled: eventSamplingEnabled, + eventSamplingDuration: eventSamplingDuration, + eventSampler: eventSampler, + namespace: config.GetKubeNamespace(), instanceID: conf.GetString("INSTANCE_ID", "1"), region: conf.GetString("region", ""), @@ -201,7 +218,7 @@ func (edr *ErrorDetailReporter) GetSyncer(syncerKey string) *types.SyncSource { return edr.syncers[syncerKey] } -func shouldReport(metric *types.PUReportedMetric) bool { +func shouldReport(metric types.PUReportedMetric) bool { switch { case metric.StatusDetail.StatusCode >= http.StatusBadRequest, metric.StatusDetail.StatusCode == types.FilterEventCode, metric.StatusDetail.StatusCode == types.SuppressEventCode: return true @@ -225,12 +242,12 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR reportedAt := time.Now().UTC().Unix() / 60 for _, metric := range metrics { + metric := *metric if !shouldReport(metric) { continue } workspaceID := edr.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID) - metric := *metric if edr.IsPIIReportingDisabled(workspaceID) { edr.log.Debugn("PII setting is disabled for workspaceId:", obskit.WorkspaceID(workspaceID)) return nil @@ -239,16 +256,23 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail)) // extract error-message & error-code - errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags) + metric.StatusDetail.ErrorDetails = edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags) edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{ - "errorCode": errDets.ErrorCode, + "errorCode": metric.StatusDetail.ErrorDetails.Code, "workspaceId": workspaceID, "destType": destinationDetail.destType, "sourceId": metric.ConnectionDetails.SourceID, "destinationId": metric.ConnectionDetails.DestinationID, }).Count(int(metric.StatusDetail.Count)) + if edr.eventSamplingEnabled.Load() { + metric, err = transformMetricWithEventSampling(metric, reportedAt, edr.eventSampler, int64(edr.eventSamplingDuration.Load().Minutes())) + if err != nil { + return err + } + } + _, err = stmt.Exec( workspaceID, edr.namespace, @@ -263,8 +287,8 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR metric.StatusDetail.Count, metric.StatusDetail.StatusCode, metric.StatusDetail.EventType, - errDets.ErrorCode, - errDets.ErrorMessage, + metric.StatusDetail.ErrorDetails.Code, + metric.StatusDetail.ErrorDetails.Message, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, @@ -312,13 +336,13 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) { return dbHandle, nil } -func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) errorDetails { +func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) types.ErrorDetails { errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse) cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg) errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statTags) - return errorDetails{ - ErrorMessage: cleanedErrMsg, - ErrorCode: errorCode, + return types.ErrorDetails{ + Message: cleanedErrMsg, + Code: errorCode, } } @@ -776,4 +800,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me func (edr *ErrorDetailReporter) Stop() { edr.cancel() _ = edr.g.Wait() + if edr.eventSampler != nil { + edr.eventSampler.Close() + } } diff --git a/enterprise/reporting/error_reporting_test.go b/enterprise/reporting/error_reporting_test.go index 58c0c68f1e..fccf408f89 100644 --- a/enterprise/reporting/error_reporting_test.go +++ b/enterprise/reporting/error_reporting_test.go @@ -2,6 +2,7 @@ package reporting import ( "context" + "fmt" "net/http" "testing" @@ -19,6 +20,7 @@ import ( "github.com/rudderlabs/rudder-server/utils/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" utilsTx "github.com/rudderlabs/rudder-server/utils/tx" ) @@ -27,7 +29,7 @@ func TestShouldReport(t *testing.T) { RegisterTestingT(t) // Test case 1: Event failure case - metric1 := &types.PUReportedMetric{ + metric1 := types.PUReportedMetric{ StatusDetail: &types.StatusDetail{ StatusCode: http.StatusBadRequest, }, @@ -35,7 +37,7 @@ func TestShouldReport(t *testing.T) { Expect(shouldReport(metric1)).To(BeTrue()) // Test case 2: Filter event case - metric2 := &types.PUReportedMetric{ + metric2 := types.PUReportedMetric{ StatusDetail: &types.StatusDetail{ StatusCode: types.FilterEventCode, }, @@ -43,7 +45,7 @@ func TestShouldReport(t *testing.T) { Expect(shouldReport(metric2)).To(BeTrue()) // Test case 3: Suppress event case - metric3 := &types.PUReportedMetric{ + metric3 := types.PUReportedMetric{ StatusDetail: &types.StatusDetail{ StatusCode: types.SuppressEventCode, }, @@ -51,7 +53,7 @@ func TestShouldReport(t *testing.T) { Expect(shouldReport(metric3)).To(BeTrue()) // Test case 4: Success cases - metric4 := &types.PUReportedMetric{ + metric4 := types.PUReportedMetric{ StatusDetail: &types.StatusDetail{ StatusCode: http.StatusOK, }, @@ -59,7 +61,30 @@ func TestShouldReport(t *testing.T) { Expect(shouldReport(metric4)).To(BeFalse()) } -func TestErrorDetailReporter_Report(t *testing.T) { +func TestCleanUpErrorMessage(t *testing.T) { + ext := NewErrorDetailExtractor(logger.NOP) + type testCase struct { + inputStr string + expected string + } + + testCases := []testCase{ + {inputStr: "Object with ID '123983489734' is not a valid object", expected: "Object with ID is not a valid object"}, + {inputStr: "http://xyz-rudder.com/v1/endpoint not reachable: context deadline exceeded", expected: " not reachable context deadline exceeded"}, + {inputStr: "http://xyz-rudder.com/v1/endpoint not reachable 172.22.22.10: EOF", expected: " not reachable EOF"}, + {inputStr: "Request failed to process from 16-12-2022:19:30:23T+05:30 due to internal server error", expected: "Request failed to process from due to internal server error"}, + {inputStr: "User with email 'vagor12@bing.com' is not valid", expected: "User with email is not valid"}, + {inputStr: "Allowed timestamp is [15 minutes] into the future", expected: "Allowed timestamp is minutes into the future"}, + } + for i, tCase := range testCases { + t.Run(fmt.Sprintf("Case-%d", i), func(t *testing.T) { + actual := ext.CleanUpErrorMessage(tCase.inputStr) + require.Equal(t, tCase.expected, actual) + }) + } +} + +func TestErrorDetailsReport(t *testing.T) { db, dbMock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) @@ -222,3 +247,450 @@ func TestErrorDetailReporter_Report(t *testing.T) { assert.NoError(t, err) } } + +func TestGetErrorMessageFromResponse(t *testing.T) { + ext := NewErrorDetailExtractor(logger.NOP) + + for i, tc := range tcs { + t.Run(fmt.Sprintf("payload-%v", i), func(t *testing.T) { + msg := ext.GetErrorMessage(tc.inputStr) + require.Equal(t, tc.expected, msg) + }) + } +} + +func TestExtractErrorDetails(t *testing.T) { + type depTcOutput struct { + errorMsg string + errorCode string + } + type depTc struct { + caseDescription string + inputErrMsg string + output depTcOutput + statTags map[string]string + } + testCases := []depTc{ + { + caseDescription: "should validate the deprecation correctly", + inputErrMsg: "Offline Conversions API is deprecated from onwards. Please use Conversions API, which is the latest version that supports Offline Conversions API and can be used until.", + output: depTcOutput{ + errorMsg: "Offline Conversions API is deprecated from onwards Please use Conversions API which is the latest version that supports Offline Conversions API and can be used until ", + errorCode: "deprecation", + }, + }, + { + caseDescription: "should validate the deprecation correctly even though we have upper-case keywords", + inputErrMsg: "Offline Conversions API is DeprEcated from onwards. Please use Conversions API, which is the latest version that supports Offline Conversions API and can be used until.", + output: depTcOutput{ + errorMsg: "Offline Conversions API is DeprEcated from onwards Please use Conversions API which is the latest version that supports Offline Conversions API and can be used until ", + errorCode: "deprecation", + }, + }, + { + caseDescription: "should use statTags to compute errorCode", + statTags: map[string]string{ + "errorCategory": "dataValidation", + "errorType": "configuration", + }, + inputErrMsg: "Some error", + output: depTcOutput{ + errorMsg: "Some error", + errorCode: "dataValidation:configuration", + }, + }, + } + + edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP, config.Default) + for _, tc := range testCases { + t.Run(tc.caseDescription, func(t *testing.T) { + errorDetails := edr.extractErrorDetails(tc.inputErrMsg, tc.statTags) + + require.Equal(t, tc.output.errorMsg, errorDetails.Message) + require.Equal(t, tc.output.errorCode, errorDetails.Code) + }) + } +} + +type getValTc struct { + inputStr string + expected string +} + +var tcs = []getValTc{ + { + inputStr: `{"response":"{\"message\":\"Primary key 'Contact Key' does not exist.\",\"errorcode\":10000,\"documentation\":\"\"}"}`, + expected: "Primary key 'Contact Key' does not exist.", + }, + { + inputStr: `{"response":"Event Edit_Order_Button_Clicked doesn't match with Snapchat Events!","firstAttemptedAt":"2023-03-30T16:58:05.628Z","content-type":"application/json"}`, + expected: "Event Edit_Order_Button_Clicked doesn't match with Snapchat Events!", + }, + { + inputStr: `{"response":"{\"status\":400,\"message\":\"Failed with Unsupported post request. Object with ID '669556453669016' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api during response transformation\",\"destinationResponse\":{\"error\":{\"message\":\"Unsupported post request. Object with ID '669556453669016' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api\",\"type\":\"GraphMethodException\",\"code\":100,\"error_subcode\":33,\"fbtrace_id\":\"AAjsXHCiypjAV50Vg-dZx4D\"},\"status\":400},\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"FACEBOOK_PIXEL\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2EXmEugZiMykYfFzoPnXjq6bJ6D\",\"workspaceId\":\"1vTJeDNwZx4bJF7c5ZUWPqJpMVx\",\"context\":\"[Native Integration Service] Failure During Processor Transform\"}}","firstAttemptedAt":"2023-03-30T17:39:17.638Z","content-type":"application/json"}`, + expected: "Unsupported post request. Object with ID '669556453669016' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api", + }, + { + inputStr: `{"response":"{\n \"meta\": {\n \"error\": \"Unauthorized request\"\n }\n}\n","firstAttemptedAt":"2023-03-30T17:39:00.377Z","content-type":"application/json; charset=utf-8"}`, + expected: "Unauthorized request", + }, + { + inputStr: `TypeError: Cannot set property 'event_name' of undefined`, + expected: `TypeError: Cannot set property 'event_name' of undefined`, + }, + { + inputStr: `{"response":"{\"code\":400,\"error\":\"Invalid API key: 88ea3fd9e15f74491ac6dd401c8733c9\"}\r\r\r\n","firstAttemptedAt":"2023-03-30T17:39:36.755Z","content-type":"application/json"}`, + expected: `Invalid API key: 88ea3fd9e15f74491ac6dd401c8733c9`, + }, + { + inputStr: `{"response":"{\"status\":\"error\",\"message\":\"Invalid input JSON on line 1, column 54: Cannot deserialize value of type ` + "`" + `java.lang.String` + "`" + ` from Object value (token ` + "`" + `JsonToken.START_OBJECT` + "`" + `)\",\"correlationId\":\"c73c9759-e9fe-4061-8da6-3d7a10159f54\"}","firstAttemptedAt":"2023-03-30T17:39:12.397Z","content-type":"application/json;charset=utf-8"}`, + expected: `Invalid input JSON on line 1, column 54: Cannot deserialize value of type ` + "`" + `java.lang.String` + "`" + ` from Object value (token ` + "`" + `JsonToken.START_OBJECT` + "`" + `)`, + }, + { + inputStr: `{"response":"{\"error\":\"Event request failed (Invalid callback parameters)\"}","firstAttemptedAt":"2023-03-30T17:38:36.152Z","content-type":"application/json; charset=utf-8"}`, + expected: `Event request failed (Invalid callback parameters)`, + }, + { + inputStr: `{"response":"{\"type\":\"error.list\",\"request_id\":\"0000ib5themn9npuifdg\",\"errors\":[{\"code\":\"not_found\",\"message\":\"User Not Found\"}]}","firstAttemptedAt":"2023-03-30T17:38:47.857Z","content-type":"application/json; charset=utf-8"}`, + expected: `User Not Found`, + }, + { + inputStr: `{"response":"{\"type\":\"error.list\",\"request_id\":\"0000ohm9i1s3euaavmmg\",\"errors\":[{\"code\":\"unauthorized\",\"message\":\"Access Token Invalid\"}]}","firstAttemptedAt":"2023-03-30T16:59:37.973Z","content-type":"application/json; charset=utf-8"}`, + expected: `Access Token Invalid`, + }, + { + inputStr: `{"response":"[CDM GOOGLESHEETS] Unable to create client for 21xmYbMXCovrqQn5BIszNvcYmur circuit breaker is open, last error: [GoogleSheets] error :: [GoogleSheets] error :: error in GoogleSheets while unmarshalling credentials json:: invalid character 'v' looking for beginning of value","firstAttemptedAt":"2023-03-30T16:58:32.010Z","content-type":""}`, + expected: `[CDM GOOGLESHEETS] Unable to create client for 21xmYbMXCovrqQn5BIszNvcYmur circuit breaker is open, last error: [GoogleSheets] error :: [GoogleSheets] error :: error in GoogleSheets while unmarshalling credentials json:: invalid character 'v' looking for beginning of value`, + }, + { + inputStr: `{"response":"{\n \"meta\": {\n \"errors\": [\n \"id attribute and identifier are not the same value\",\n \"value for attribute 'token' cannot be longer than 1000 bytes\"\n ]\n }\n}\n","firstAttemptedAt":"2023-03-30T17:31:01.599Z","content-type":"application/json; charset=utf-8"}`, + expected: `id attribute and identifier are not the same value.value for attribute 'token' cannot be longer than 1000 bytes`, + }, + { + inputStr: `{"response":"{\"errors\":[\"No users with this external_id found\"]}","firstAttemptedAt":"2023-03-30T17:37:58.184Z","content-type":"application/json; charset=utf-8"}`, + expected: `No users with this external_id found`, + }, + { + inputStr: `{"response":"{\"status\":500,\"destinationResponse\":{\"response\":\"[ECONNRESET] :: Connection reset by peer\",\"status\":500,\"rudderJobMetadata\":{\"jobId\":2942260148,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"1qWeSkIiAhg3O96KzVVU5MieZHM\",\"destinationId\":\"2IpcvzcMgbLfXgGA4auDrtfEMuI\",\"workspaceId\":\"1nnEnc0tt7k9eFf7bz1GsiU0MFC\",\"secret\":null}},\"message\":\"[GA4 Response Handler] Request failed for destination ga4 with status: 500\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"retryable\",\"destType\":\"GA4\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2IpcvzcMgbLfXgGA4auDrtfEMuI\",\"workspaceId\":\"1nnEnc0tt7k9eFf7bz1GsiU0MFC\"}}","firstAttemptedAt":"2023-03-30T17:22:26.488Z","content-type":"application/json"}`, + expected: `[GA4 Response Handler] Request failed for destination ga4 with status: 500`, + }, + { + inputStr: `{"response":"{\"status\":502,\"destinationResponse\":{\"response\":\"\u003chtml\u003e\\r\\n\u003chead\u003e\u003ctitle\u003e502 Bad Gateway\u003c/title\u003e\u003c/head\u003e\\r\\n\u003cbody\u003e\\r\\n\u003ccenter\u003e\u003ch1\u003e502 Bad Gateway\u003c/h1\u003e\u003c/center\u003e\\r\\n\u003c/body\u003e\\r\\n\u003c/html\u003e\\r\\n\",\"status\":502,\"rudderJobMetadata\":{\"jobId\":4946423471,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"2CRIZp4OS10sjjGF3uF4guZuGmj\",\"destinationId\":\"2DstxLWX7Oi7gPdPM1CR9CikOko\",\"workspaceId\":\"1yaBlqltp5Y4V2NK8qePowlyafu\",\"secret\":null}},\"message\":\"Request failed with status: 502\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"retryable\",\"destType\":\"CLEVERTAP\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2DstxLWX7Oi7gPdPM1CR9CikOko\",\"workspaceId\":\"1yaBlqltp5Y4V2NK8qePowlyafu\"}}","firstAttemptedAt":"2023-03-30T17:11:55.326Z","content-type":"application/json"}`, + expected: "Request failed with status: 502", + }, + { + inputStr: `{"response":"{\"status\":400,\"destinationResponse\":{\"response\":\"\u003c!DOCTYPE html\u003e\\n\u003chtml lang=en\u003e\\n \u003cmeta charset=utf-8\u003e\\n \u003cmeta name=viewport content=\\\"initial-scale=1, minimum-scale=1, width=device-width\\\"\u003e\\n \u003ctitle\u003eError 400 (Bad Request)!!1\u003c/title\u003e\\n \u003cstyle\u003e\\n *{margin:0;padding:0}html,code{font:15px/22px arial,sans-serif}html{background:#fff;color:#222;padding:15px}body{margin:7% auto 0;max-width:390px;min-height:180px;padding:30px 0 15px}* \u003e body{background:url(//www.google.com/images/errors/robot.png) 100% 5px no-repeat;padding-right:205px}p{margin:11px 0 22px;overflow:hidden}ins{color:#777;text-decoration:none}a img{border:0}@media screen and (max-width:772px){body{background:none;margin-top:0;max-width:none;padding-right:0}}#logo{background:url(//www.google.com/images/branding/googlelogo/1x/googlelogo_color_150x54dp.png) no-repeat;margin-left:-5px}@media only screen and (min-resolution:192dpi){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) no-repeat 0% 0%/100% 100%;-moz-border-image:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) 0}}@media only screen and (-webkit-min-device-pixel-ratio:2){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) no-repeat;-webkit-background-size:100% 100%}}#logo{display:inline-block;height:54px;width:150px}\\n \u003c/style\u003e\\n \u003ca href=//www.google.com/\u003e\u003cspan id=logo aria-label=Google\u003e\u003c/span\u003e\u003c/a\u003e\\n \u003cp\u003e\u003cb\u003e400.\u003c/b\u003e \u003cins\u003eThat’s an error.\u003c/ins\u003e\\n \u003cp\u003eYour client has issued a malformed or illegal request. \u003cins\u003eThat’s all we know.\u003c/ins\u003e\\n\",\"status\":400,\"rudderJobMetadata\":{\"jobId\":597438246,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"2E2b60bs1ybmIKuGvaVxtLT5lYo\",\"destinationId\":\"2E5xYQkj5OVrA3CWexvRLV4b7RH\",\"workspaceId\":\"1sUXvPs0hYgjBxSfjG4gqnRFNoP\",\"secret\":null}},\"message\":\"[GA4 Response Handler] Request failed for destination ga4 with status: 400\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"GA4\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2E5xYQkj5OVrA3CWexvRLV4b7RH\",\"workspaceId\":\"1sUXvPs0hYgjBxSfjG4gqnRFNoP\"}}","firstAttemptedAt":"2023-03-30T17:11:25.524Z","content-type":"application/json"}`, + expected: "[GA4 Response Handler] Request failed for destination ga4 with status: 400", + }, + { + inputStr: `{"response":"[GoogleSheets] error :: Failed to insert Payload :: This action would increase the number of cells in the workbook above the limit of 10000000 cells.","firstAttemptedAt":"2023-03-30T17:14:18.737Z","content-type":""}`, + expected: `[GoogleSheets] error :: Failed to insert Payload :: This action would increase the number of cells in the workbook above the limit of 10000000 cells.`, + }, + { + inputStr: `{"response":"{\"msg\":\"Project 9434: The request would increase the number of object fields to 19434 by adding [Application: Invoke Use Positions By Category Mutation, Application: Invoke Use Positions By Category Mutation.request, Application: Invoke Use Positions By Category Mutation.request.cacheID, Application: Invoke Use Positions By Category Mutation.request.metadata, Application: Invoke Use Positions By Category Mutation.request.name, Application: Invoke Use Positions By Category Mutation.request.operationKind, Application: Invoke Use Positions By Category Mutation.request.text, Application: Invoke Use Positions By Category Mutation.variables, Application: Invoke Use Positions By Category Mutation.variables.input, Application: Invoke Use Positions By Category Mutation.variables.input.gigPositionId, Application: Invoke Use Positions By Category Mutation.variables.input.status, Application: Invoke Use Positions By Category Mutation.variables.input.workerId, Application: Pressed Action Row, Application: Pressed Checkbox], which exceeds the limit of 15000\",\"code\":\"UniqueFieldsLimitExceeded\",\"params\":{\"limit\":15000,\"requestedTotal\":19434,\"newFields\":[\"Application: Invoke Use Positions By Category Mutation\",\"Application: Invoke Use Positions By Category Mutation.request\",\"Application: Invoke Use Positions By Category Mutation.request.cacheID\",\"Application: Invoke Use Positions By Category Mutation.request.metadata\",\"Application: Invoke Use Positions By Category Mutation.request.name\",\"Application: Invoke Use Positions By Category Mutation.request.operationKind\",\"Application: Invoke Use Positions By Category Mutation.request.text\",\"Application: Invoke Use Positions By Category Mutation.variables\",\"Application: Invoke Use Positions By Category Mutation.variables.input\",\"Application: Invoke Use Positions By Category Mutation.variables.input.gigPositionId\",\"Application: Invoke Use Positions By Category Mutation.variables.input.status\",\"Application: Invoke Use Positions By Category Mutation.variables.input.workerId\",\"Application: Pressed Action Row\",\"Application: Pressed Checkbox\"]}}","firstAttemptedAt":"2023-03-30T17:39:32.189Z","content-type":"application/json"}`, + expected: `Project 9434: The request would increase the number of object fields to 19434 by adding [Application: Invoke Use Positions By Category Mutation, Application: Invoke Use Positions By Category Mutation.request, Application: Invoke Use Positions By Category Mutation.request.cacheID, Application: Invoke Use Positions By Category Mutation.request.metadata, Application: Invoke Use Positions By Category Mutation.request.name, Application: Invoke Use Positions By Category Mutation.request.operationKind, Application: Invoke Use Positions By Category Mutation.request.text, Application: Invoke Use Positions By Category Mutation.variables, Application: Invoke Use Positions By Category Mutation.variables.input, Application: Invoke Use Positions By Category Mutation.variables.input.gigPositionId, Application: Invoke Use Positions By Category Mutation.variables.input.status, Application: Invoke Use Positions By Category Mutation.variables.input.workerId, Application: Pressed Action Row, Application: Pressed Checkbox], which exceeds the limit of 15000`, + }, + { + inputStr: `{"response":"{\"status\":400,\"destinationResponse\":\"\",\"message\":\"Unable to find conversionActionId for conversion:Order Completed\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"meta\":\"instrumentation\",\"destType\":\"GOOGLE_ADWORDS_ENHANCED_CONVERSIONS\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2DiY9INMJbBzfCdjgRlvYzsUl57\",\"workspaceId\":\"1zffyLlFcWBMmv4vvtYldWxEdGg\"}}","firstAttemptedAt":"2023-03-30T17:32:47.268Z","content-type":"application/json"}`, + expected: `Unable to find conversionActionId for conversion:Order Completed`, + }, + { + inputStr: `{"response":"{\"status\":400,\"destinationResponse\":{\"response\":[{\"duplicateResut\":{\"allowSave\":true,\"duplicateRule\":\"Contacts_DR\",\"duplicateRuleEntityType\":\"Contact\",\"errorMessage\":\"You're creating a duplicate record. We recommend you use an existing record instead.\",\"matchResults\":[{\"entityType\":\"Contact\",\"errors\":[],\"matchEngine\":\"ExactMatchEngine\",\"matchRecords\":[{\"additionalInformation\":[],\"fieldDiffs\":[],\"matchConfidence\":100,\"record\":{\"attributes\":{\"type\":\"Contact\",\"url\":\"/services/data/v50.0/sobjects/Contact/0031i000013x2TEAAY\"},\"Id\":\"0031i000013x2TEAAY\"}}],\"rule\":\"Contact_MR\",\"size\":1,\"success\":true}]},\"errorCode\":\"DUPLICATES_DETECTED\",\"message\":\"You're creating a duplicate record. We recommend you use an existing record instead.\"}],\"status\":400,\"rudderJobMetadata\":{\"jobId\":1466739020,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"2JF2LaBUedeOfAyt9EoIVJylkKS\",\"destinationId\":\"2JJDsqHbkuIZ89ldOBMaBjPi9L7\",\"workspaceId\":\"26TTcz2tQucRs2xZiGThQzGRk2l\",\"secret\":null,\"destInfo\":{\"authKey\":\"2JJDsqHbkuIZ89ldOBMaBjPi9L7\"}}},\"message\":\"Salesforce Request Failed: \\\"400\\\" due to \\\"You're creating a duplicate record. We recommend you use an existing record instead.\\\", (Aborted) during Salesforce Response Handling\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"SALESFORCE\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2JJDsqHbkuIZ89ldOBMaBjPi9L7\",\"workspaceId\":\"26TTcz2tQucRs2xZiGThQzGRk2l\"}}","firstAttemptedAt":"2023-03-30T17:07:52.359Z","content-type":"application/json"}`, + expected: `You're creating a duplicate record. We recommend you use an existing record instead.`, + }, + { + inputStr: `{"response":"\u003c!--\n ~ Copyright (C) 2010-2021 Evergage, Inc.\n ~ All rights reserved.\n --\u003e\n\n\u003c!DOCTYPE html\u003e\n\u003chtml lang=\"en\"\u003e\n\u003chead\u003e\n \u003cmeta charset=\"UTF-8\"\u003e\n \u003ctitle\u003eSalesforce Personalization\u003c/title\u003e\n \u003clink rel=\"icon\" type=\"image/x-icon\" href=\"https://www.salesforce.com/etc/designs/sfdc-www/en_us/favicon.ico\"/\u003e\n \u003clink rel=\"shortcut icon\" type=\"image/x-icon\" href=\"https://www.salesforce.com/etc/designs/sfdc-www/en_us/favicon.ico\"/\u003e\n \u003cstyle\u003e\n body { font-family: Salesforce Sans,Arial,sans-serif; text-align: center; padding: 50px; background-color:#fff; }\n h1 { font-size: 1.25rem; color: #080707; text-align: center; margin-top: -0.5rem; }\n p { font-size: 0.8125rem; color: #3E3E3C; text-align:center; }\n \u003c/style\u003e\n\u003c/head\u003e\n\u003cbody\u003e\n \u003cdiv align=”center”\u003e\n \u003cimg src=\"/PageNotAvailable.svg\" /\u003e\n \u003c/div\u003e\n \u003cdiv align=”center”\u003e\n \u003ch1\u003eThe page you want isn’t available.\u003c/h1\u003e\n \u003cp\u003eTo find the page you want, use the main navigation.\u003c/p\u003e\n \u003c/div\u003e\n\u003c/body\u003e\n\u003c/html\u003e","firstAttemptedAt":"2024-09-02T06:57:13.829Z","content-type":"text/html"}`, + expected: "The page you want isn’t available.\r\n\r\n To find the page you want, use the main navigation.\r\n\r\n ", + }, + { + inputStr: `{"response":"{\"status\":\"fail\",\"processed\":0,\"unprocessed\":[{\"status\":\"fail\",\"code\":513,\"error\":\"Event Name is incorrect. ErrorCode: 513 - Trying to raise a restricted system event. Skipped record number : 1\",\"record\":{\"evtData\":{\"initial_referrer\":\"https://www.google.com/\",\"initial_referring_domain\":\"www.google.com\",\"path\":\"/busca/\",\"referrer\":\"https://www.extrabom.com.br/busca/?q=Bombril\u0026anonymous=347f65ea66096fd7db4e1bd88211a83dfbe263b78da6a5de0261d160c54100ba\",\"referring_domain\":\"www.extrabom.com.br\",\"search\":\"?q=X14\u0026anonymous=347f65ea66096fd7db4e1bd88211a83dfbe263b78da6a5de0261d160c54100ba\",\"tab_url\":\"https://www.extrabom.com.br/busca/?q=X14\u0026anonymous=347f65ea66096fd7db4e1bd88211a83dfbe263b78da6a5de0261d160c54100ba\",\"title\":\"X14 - Busca - Extrabom\",\"url\":\"https://www.extrabom.com.br/busca/?q=X14\"},\"evtName\":\"Web Page Viewed\",\"identity\":\"69298\",\"type\":\"event\"}}]}","firstAttemptedAt":"2024-09-02T00:40:06.451Z","content-type":"application/json"}`, + expected: "Event Name is incorrect. ErrorCode: 513 - Trying to raise a restricted system event. Skipped record number : 1", + }, + { + inputStr: `{"response":"{\"destinationResponse\":\"\u003c!DOCTYPE html\u003e\\n\u003chtml lang=\\\"en\\\" id=\\\"facebook\\\"\u003e\\n \u003chead\u003e\\n \u003ctitle\u003eFacebook | Error\u003c/title\u003e\\n \u003cmeta charset=\\\"utf-8\\\"\u003e\\n \u003cmeta http-equiv=\\\"cache-control\\\" content=\\\"no-cache\\\"\u003e\\n \u003cmeta http-equiv=\\\"cache-control\\\" content=\\\"no-store\\\"\u003e\\n \u003cmeta http-equiv=\\\"cache-control\\\" content=\\\"max-age=0\\\"\u003e\\n \u003cmeta http-equiv=\\\"expires\\\" content=\\\"-1\\\"\u003e\\n \u003cmeta http-equiv=\\\"pragma\\\" content=\\\"no-cache\\\"\u003e\\n \u003cmeta name=\\\"robots\\\" content=\\\"noindex,nofollow\\\"\u003e\\n \u003cstyle\u003e\\n html, body {\\n color: #141823;\\n background-color: #e9eaed;\\n font-family: Helvetica, Lucida Grande, Arial,\\n Tahoma, Verdana, sans-serif;\\n margin: 0;\\n padding: 0;\\n text-align: center;\\n }\\n\\n #header {\\n height: 30px;\\n padding-bottom: 10px;\\n padding-top: 10px;\\n text-align: center;\\n }\\n\\n #icon {\\n width: 30px;\\n }\\n\\n h1 {\\n font-size: 18px;\\n }\\n\\n p {\\n font-size: 13px;\\n }\\n\\n #footer {\\n border-top: 1px solid #ddd;\\n color: #9197a3;\\n font-size: 12px;\\n padding: 5px 8px 6px 0;\\n }\\n \u003c/style\u003e\\n \u003c/head\u003e\\n \u003cbody\u003e\\n \u003cdiv id=\\\"header\\\"\u003e\\n \u003ca href=\\\"//www.facebook.com/\\\"\u003e\\n \u003cimg id=\\\"icon\\\" src=\\\"//static.facebook.com/images/logos/facebook_2x.png\\\" /\u003e\\n \u003c/a\u003e\\n \u003c/div\u003e\\n \u003cdiv id=\\\"core\\\"\u003e\\n \u003ch1 id=\\\"sorry\\\"\u003eSorry, something went wrong.\u003c/h1\u003e\\n \u003cp id=\\\"promise\\\"\u003e\\n We're working on it and we'll get it fixed as soon as we can.\\n \u003c/p\u003e\\n \u003cp id=\\\"back-link\\\"\u003e\\n \u003ca id=\\\"back\\\" href=\\\"//www.facebook.com/\\\"\u003eGo Back\u003c/a\u003e\\n \u003c/p\u003e\\n \u003cdiv id=\\\"footer\\\"\u003e\\n Facebook\\n \u003cspan id=\\\"copyright\\\"\u003e\\n \u0026copy; 2022\\n \u003c/span\u003e\\n \u003cspan id=\\\"help-link\\\"\u003e\\n \u0026#183;\\n \u003ca id=\\\"help\\\" href=\\\"//www.facebook.com/help/\\\"\u003eHelp Center\u003c/a\u003e\\n \u003c/span\u003e\\n \u003c/div\u003e\\n \u003c/div\u003e\\n \u003cscript\u003e\\n document.getElementById('back').onclick = function() {\\n if (history.length \u003e 1) {\\n history.back();\\n return false;\\n }\\n };\\n\\n // Adjust the display based on the window size\\n if (window.innerHeight \u003c 80 || window.innerWidth \u003c 80) {\\n // Blank if window is too small\\n document.body.style.display = 'none';\\n };\\n if (window.innerWidth \u003c 200 || window.innerHeight \u003c 150) {\\n document.getElementById('back-link').style.display = 'none';\\n document.getElementById('help-link').style.display = 'none';\\n };\\n if (window.innerWidth \u003c 200) {\\n document.getElementById('sorry').style.fontSize = '16px';\\n };\\n if (window.innerWidth \u003c 150) {\\n document.getElementById('promise').style.display = 'none';\\n };\\n if (window.innerHeight \u003c 150) {\\n document.getElementById('sorry').style.margin = '4px 0 0 0';\\n document.getElementById('sorry').style.fontSize = '14px';\\n document.getElementById('promise').style.display = 'none';\\n };\\n \u003c/script\u003e\\n \u003c/body\u003e\\n\u003c/html\u003e\\n\",\"message\":\"Request Processed Successfully\",\"status\":502}","firstAttemptedAt":"2023-03-30T17:31:41.884Z","content-type":"application/json"}`, + expected: "Request Processed Successfully", + }, + { + inputStr: `{"response":"{\"status\":402,\"message\":\"[Generic Response Handler] Request failed for destination active_campaign with status: 402\",\"destinationResponse\":{\"response\":\"\",\"status\":402,\"rudderJobMetadata\":{\"jobId\":38065100,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"1tzHYMqZp5Cogqe4EeagdfpM7Y3\",\"destinationId\":\"1uFSe3P8gJnacuYQIhWFhRfvCGQ\",\"workspaceId\":\"1tyLn8D94vS107gQwxsmhZaFfP2\",\"secret\":null}},\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"ACTIVE_CAMPAIGN\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"1uFSe3P8gJnacuYQIhWFhRfvCGQ\",\"workspaceId\":\"1tyLn8D94vS107gQwxsmhZaFfP2\",\"context\":\"[Native Integration Service] Failure During Processor Transform\"}}","firstAttemptedAt":"2023-03-30T17:03:40.883Z","content-type":"application/json"}`, + expected: "[Generic Response Handler] Request failed for destination active_campaign with status: 402", + }, + { + inputStr: `{"response":"{\"message\":\"Valid data must be provided in the 'attributes', 'events', or 'purchases' fields.\",\"errors\":[{\"type\":\"'external_id' or 'braze_id' or 'user_alias' is required\",\"input_array\":\"attributes\",\"index\":0}]}","firstAttemptedAt":"2023-03-30T17:26:36.847Z","content-type":"application/json"}`, + expected: "Valid data must be provided in the 'attributes', 'events', or 'purchases' fields.", + }, + { + inputStr: `{"fetching_remote_schema_failed":{"attempt":6,"errors":["dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused"]}}`, + expected: `dial tcp 159.223.171.199:38649: connect: connection refused`, + }, + { + inputStr: `{"exporting_data_failed":{"attempt":5,"errors":["2 errors occurred:\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes."]}}`, + expected: "2 errors occurred:\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout.2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.", + }, + { + inputStr: `{"response":"{\n\t\t\t\tError: invalid character 'P' looking for beginning of value,\n\t\t\t\t(trRespStCd, trRespBody): (504, Post \"http://transformer.rudder-us-east-1b-blue/v0/destinations/google_adwords_enhanced_conversions/proxy\": context deadline exceeded (Client.Timeout exceeded while awaiting headers)),\n\t\t\t}","firstAttemptedAt":"2023-03-30T17:24:58.068Z","content-type":"text/plain; charset=utf-8"}`, + expected: "{\n\t\t\t\tError: invalid character 'P' looking for beginning of value,\n\t\t\t\t(trRespStCd, trRespBody): (504, Post \"http://transformer.rudder-us-east-1b-blue/v0/destinations/google_adwords_enhanced_conversions/proxy\": context deadline exceeded (Client.Timeout exceeded while awaiting headers)),\n\t\t\t}", + }, + { + inputStr: `{"error":"{\"message\":\"some random message\",\"destinationResponse\":{\"error\":{\"message\":\"Unhandled random error\",\"type\":\"RandomException\",\"code\":5,\"error_subcode\":12,\"fbtrace_id\":\"facebook_px_trace_id_10\"},\"status\":412}}","firstAttemptedAt":"2023-04-20T17:24:58.068Z","content-type":"text/plain; charset=utf-8"}`, + expected: "Unhandled random error", + }, + { + inputStr: `{"error":"unknown error occurred","firstAttemptedAt":"2023-04-21T17:24:58.068Z","content-type":"text/plain; charset=utf-8"}`, + expected: "unknown error occurred", + }, +} + +func BenchmarkJsonNestedSearch(b *testing.B) { + extractor := NewErrorDetailExtractor(logger.NOP) + + b.Run("JsonNested used fn", func(b *testing.B) { + for i := 0; i < len(tcs); i++ { + extractor.GetErrorMessage(tcs[i].inputStr) + } + }) +} + +func TestAggregationLogic(t *testing.T) { + dbErrs := []*types.EDReportsDB{ + { + PU: "dest_transformer", + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: "wsp1", + InstanceID: "instance-1", + Namespace: "nmspc", + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: "src-1", + SourceDefinitionId: "src-def-1", + DestinationDefinitionId: "des-def-1", + DestinationID: "des-1", + DestType: "DES_1", + }, + EDErrorDetails: types.EDErrorDetails{ + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: 200, + ErrorCode: "", + ErrorMessage: "", + EventType: "identify", + }, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 124335445, + }, + Count: 10, + }, + { + PU: "dest_transformer", + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: "wsp1", + InstanceID: "instance-1", + Namespace: "nmspc", + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: "src-1", + SourceDefinitionId: "src-def-1", + DestinationDefinitionId: "des-def-1", + DestinationID: "des-1", + DestType: "DES_1", + }, + EDErrorDetails: types.EDErrorDetails{ + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: 400, + ErrorCode: "", + ErrorMessage: "bad data sent for transformation", + EventType: "identify", + }, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 124335445, + }, + Count: 5, + }, + { + PU: "dest_transformer", + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: "wsp1", + InstanceID: "instance-1", + Namespace: "nmspc", + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: "src-1", + SourceDefinitionId: "src-def-1", + DestinationDefinitionId: "des-def-1", + DestinationID: "des-1", + DestType: "DES_1", + }, + EDErrorDetails: types.EDErrorDetails{ + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: 400, + ErrorCode: "", + ErrorMessage: "bad data sent for transformation", + EventType: "identify", + }, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 124335445, + }, + Count: 15, + }, + { + PU: "dest_transformer", + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: "wsp1", + InstanceID: "instance-1", + Namespace: "nmspc", + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: "src-1", + SourceDefinitionId: "src-def-1", + DestinationDefinitionId: "des-def-1", + DestinationID: "des-1", + DestType: "DES_1", + }, + EDErrorDetails: types.EDErrorDetails{ + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: 400, + ErrorCode: "", + ErrorMessage: "user_id information missing", + EventType: "identify", + }, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 124335446, + }, + Count: 20, + }, + // error occurred at router level(assume this is batching enabled) + { + PU: "router", + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: "wsp1", + InstanceID: "instance-1", + Namespace: "nmspc", + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: "src-1", + SourceDefinitionId: "src-def-1", + DestinationDefinitionId: "des-def-1", + DestinationID: "des-1", + DestType: "DES_1", + }, + EDErrorDetails: types.EDErrorDetails{ + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: 500, + ErrorCode: "", + ErrorMessage: "Cannot read type property of undefined", // some error during batching + EventType: "identify", + }, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 124335446, + }, + Count: 15, + }, + } + configSubscriber := newConfigSubscriber(logger.NOP) + ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP, config.Default) + reportingMetrics := ed.aggregate(dbErrs) + + reportResults := []*types.EDMetric{ + { + PU: dbErrs[0].PU, + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: dbErrs[0].WorkspaceID, + InstanceID: dbErrs[0].InstanceID, + Namespace: dbErrs[0].Namespace, + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: dbErrs[0].SourceID, + SourceDefinitionId: dbErrs[0].SourceDefinitionId, + DestinationDefinitionId: dbErrs[0].DestinationDefinitionId, + DestinationID: dbErrs[0].DestinationID, + DestType: dbErrs[0].DestType, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: dbErrs[0].ReportedAt * 60 * 1000, + }, + Errors: []types.EDErrorDetails{ + { + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: dbErrs[0].StatusCode, + ErrorCode: dbErrs[0].ErrorCode, + ErrorMessage: dbErrs[0].ErrorMessage, + EventType: dbErrs[0].EventType, + }, + ErrorCount: 10, + }, + { + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: dbErrs[1].StatusCode, + ErrorCode: dbErrs[1].ErrorCode, + ErrorMessage: dbErrs[1].ErrorMessage, + EventType: dbErrs[1].EventType, + }, + ErrorCount: 20, + }, + }, + }, + { + PU: dbErrs[3].PU, + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: dbErrs[3].WorkspaceID, + InstanceID: dbErrs[3].InstanceID, + Namespace: dbErrs[3].Namespace, + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: dbErrs[3].SourceID, + SourceDefinitionId: dbErrs[3].SourceDefinitionId, + DestinationDefinitionId: dbErrs[3].DestinationDefinitionId, + DestinationID: dbErrs[3].DestinationID, + DestType: dbErrs[3].DestType, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: dbErrs[3].ReportedAt * 60 * 1000, + }, + Errors: []types.EDErrorDetails{ + { + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: dbErrs[3].StatusCode, + ErrorCode: dbErrs[3].ErrorCode, + ErrorMessage: dbErrs[3].ErrorMessage, + EventType: dbErrs[3].EventType, + }, + ErrorCount: 20, + }, + }, + }, + { + PU: dbErrs[4].PU, + EDInstanceDetails: types.EDInstanceDetails{ + WorkspaceID: dbErrs[4].WorkspaceID, + InstanceID: dbErrs[4].InstanceID, + Namespace: dbErrs[4].Namespace, + }, + EDConnectionDetails: types.EDConnectionDetails{ + SourceID: dbErrs[4].SourceID, + SourceDefinitionId: dbErrs[4].SourceDefinitionId, + DestinationDefinitionId: dbErrs[4].DestinationDefinitionId, + DestinationID: dbErrs[4].DestinationID, + DestType: dbErrs[4].DestType, + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: dbErrs[4].ReportedAt * 60 * 1000, + }, + Errors: []types.EDErrorDetails{ + { + EDErrorDetailsKey: types.EDErrorDetailsKey{ + StatusCode: dbErrs[4].StatusCode, + ErrorCode: dbErrs[4].ErrorCode, + ErrorMessage: dbErrs[4].ErrorMessage, + EventType: dbErrs[4].EventType, + }, + ErrorCount: 15, + }, + }, + }, + } + + require.Equal(t, reportResults, reportingMetrics) +} diff --git a/enterprise/reporting/event_sampler/event_sampler.go b/enterprise/reporting/event_sampler/event_sampler.go index 8e3da3fd0e..18bb1d2b2f 100644 --- a/enterprise/reporting/event_sampler/event_sampler.go +++ b/enterprise/reporting/event_sampler/event_sampler.go @@ -9,11 +9,13 @@ import ( ) const ( - BadgerTypeEventSampler = "badger" - InMemoryCacheTypeEventSampler = "in_memory_cache" - BadgerEventSamplerPathName = "/reporting-badger" + BadgerTypeEventSampler = "badger" + InMemoryCacheTypeEventSampler = "in_memory_cache" + BadgerEventSamplerMetricsPathName = "/metrics-reporting-badger" + BadgerEventSamplerErrorsPathName = "/errors-reporting-badger" ) +//go:generate mockgen -destination=../../../mocks/enterprise/reporting/event_sampler/mock_event_sampler.go -package=mocks github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler EventSampler type EventSampler interface { Put(key string) error Get(key string) (bool, error) @@ -25,17 +27,18 @@ func NewEventSampler( ttl config.ValueLoader[time.Duration], eventSamplerType config.ValueLoader[string], eventSamplingCardinality config.ValueLoader[int], + badgerDBPath string, conf *config.Config, log logger.Logger, ) (es EventSampler, err error) { switch eventSamplerType.Load() { case BadgerTypeEventSampler: - es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log) + es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log) case InMemoryCacheTypeEventSampler: es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality) default: log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load()) - es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log) + es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log) } if err != nil { diff --git a/enterprise/reporting/event_sampler/event_sampler_test.go b/enterprise/reporting/event_sampler/event_sampler_test.go index 4d47f24123..ae4dc22534 100644 --- a/enterprise/reporting/event_sampler/event_sampler_test.go +++ b/enterprise/reporting/event_sampler/event_sampler_test.go @@ -23,7 +23,7 @@ func TestBadger(t *testing.T) { t.Run("should put and get keys", func(t *testing.T) { assert.Equal(t, 3000*time.Millisecond, ttl.Load()) - es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) + es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log) _ = es.Put("key1") _ = es.Put("key2") _ = es.Put("key3") @@ -43,7 +43,7 @@ func TestBadger(t *testing.T) { conf.Set("Reporting.eventSampling.durationInMinutes", 100) assert.Equal(t, 100*time.Millisecond, ttl.Load()) - es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) + es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log) defer es.Close() _ = es.Put("key1") @@ -65,7 +65,7 @@ func TestInMemoryCache(t *testing.T) { t.Run("should put and get keys", func(t *testing.T) { assert.Equal(t, 3000*time.Millisecond, ttl.Load()) - es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) + es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log) _ = es.Put("key1") _ = es.Put("key2") _ = es.Put("key3") @@ -83,7 +83,7 @@ func TestInMemoryCache(t *testing.T) { t.Run("should not get evicted keys", func(t *testing.T) { conf.Set("Reporting.eventSampling.durationInMinutes", 100) assert.Equal(t, 100*time.Millisecond, ttl.Load()) - es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) + es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log) _ = es.Put("key1") require.Eventually(t, func() bool { @@ -95,7 +95,7 @@ func TestInMemoryCache(t *testing.T) { t.Run("should not add keys if length exceeds", func(t *testing.T) { conf.Set("Reporting.eventSampling.durationInMinutes", 3000) assert.Equal(t, 3000*time.Millisecond, ttl.Load()) - es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) + es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log) _ = es.Put("key1") _ = es.Put("key2") _ = es.Put("key3") @@ -147,6 +147,7 @@ func BenchmarkEventSampler(b *testing.B) { ttl, eventSamplerType, eventSamplingCardinality, + BadgerEventSamplerMetricsPathName, conf, log, ) diff --git a/enterprise/reporting/label_set.go b/enterprise/reporting/label_set.go index 8910763212..5244eaa376 100644 --- a/enterprise/reporting/label_set.go +++ b/enterprise/reporting/label_set.go @@ -32,6 +32,8 @@ type LabelSet struct { EventName string EventType string ErrorType string + ErrorCode string + ErrorMessage string Bucket int64 } @@ -60,11 +62,13 @@ func NewLabelSet(metric types.PUReportedMetric, bucket int64) LabelSet { EventType: metric.StatusDetail.EventType, ErrorType: metric.StatusDetail.ErrorType, Bucket: bucket, + ErrorCode: metric.StatusDetail.ErrorDetails.Code, + ErrorMessage: metric.StatusDetail.ErrorDetails.Message, } } func (labelSet LabelSet) generateHash() string { - data := labelSet.WorkspaceID + labelSet.SourceDefinitionID + labelSet.SourceCategory + labelSet.SourceID + labelSet.DestinationDefinitionID + labelSet.DestinationID + labelSet.SourceTaskRunID + labelSet.SourceJobID + labelSet.SourceJobRunID + labelSet.TransformationID + labelSet.TransformationVersionID + labelSet.TrackingPlanID + strconv.Itoa(labelSet.TrackingPlanVersion) + labelSet.InPU + labelSet.PU + labelSet.Status + strconv.FormatBool(labelSet.TerminalState) + strconv.FormatBool(labelSet.InitialState) + strconv.Itoa(labelSet.StatusCode) + labelSet.EventName + labelSet.EventType + labelSet.ErrorType + strconv.FormatInt(labelSet.Bucket, 10) + data := labelSet.WorkspaceID + labelSet.SourceDefinitionID + labelSet.SourceCategory + labelSet.SourceID + labelSet.DestinationDefinitionID + labelSet.DestinationID + labelSet.SourceTaskRunID + labelSet.SourceJobID + labelSet.SourceJobRunID + labelSet.TransformationID + labelSet.TransformationVersionID + labelSet.TrackingPlanID + strconv.Itoa(labelSet.TrackingPlanVersion) + labelSet.InPU + labelSet.PU + labelSet.Status + strconv.FormatBool(labelSet.TerminalState) + strconv.FormatBool(labelSet.InitialState) + strconv.Itoa(labelSet.StatusCode) + labelSet.EventName + labelSet.EventType + labelSet.ErrorType + strconv.FormatInt(labelSet.Bucket, 10) + labelSet.ErrorCode + labelSet.ErrorMessage hash := murmur3.Sum64([]byte(data)) return hex.EncodeToString([]byte(strconv.FormatUint(hash, 16))) } diff --git a/enterprise/reporting/label_set_test.go b/enterprise/reporting/label_set_test.go index c4722761a0..6f2cbef4e2 100644 --- a/enterprise/reporting/label_set_test.go +++ b/enterprise/reporting/label_set_test.go @@ -4,12 +4,15 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-server/utils/types" ) -func createMetricObject(eventName string) types.PUReportedMetric { - return types.PUReportedMetric{ +const someEventName = "some-event-name" + +func createMetricObject(eventName, errorMessage string) types.PUReportedMetric { + metric := types.PUReportedMetric{ ConnectionDetails: types.ConnectionDetails{ SourceID: "some-source-id", DestinationID: "some-destination-id", @@ -28,54 +31,86 @@ func createMetricObject(eventName string) types.PUReportedMetric { EventType: "some-event-type", }, } + if errorMessage != "" { + metric.StatusDetail.ErrorDetails = types.ErrorDetails{ + Code: "some-error-code", + Message: errorMessage, + } + } + return metric } func TestNewLabelSet(t *testing.T) { t.Run("should create the correct LabelSet from types.PUReportedMetric", func(t *testing.T) { - inputMetric := createMetricObject("some-event-name") - bucket := int64(28889820) - labelSet := NewLabelSet(inputMetric, bucket) - - assert.Equal(t, "some-source-id", labelSet.SourceID) - assert.Equal(t, "some-event-name", labelSet.EventName) // Default value - }) - - t.Run("should create the correct LabelSet with custom EventName", func(t *testing.T) { - inputMetric := createMetricObject("custom-event-name") + inputMetric := createMetricObject(someEventName, "") bucket := int64(28889820) labelSet := NewLabelSet(inputMetric, bucket) assert.Equal(t, "some-source-id", labelSet.SourceID) - assert.Equal(t, "custom-event-name", labelSet.EventName) // Custom event name + assert.Equal(t, someEventName, labelSet.EventName) // Default value }) } func TestGenerateHash(t *testing.T) { - t.Run("same hash for same LabelSet", func(t *testing.T) { - inputMetric1 := createMetricObject("some-event-name") - bucket := int64(28889820) - labelSet1 := NewLabelSet(inputMetric1, bucket) - - inputMetric2 := createMetricObject("some-event-name") - labelSet2 := NewLabelSet(inputMetric2, bucket) - - hash1 := labelSet1.generateHash() - hash2 := labelSet2.generateHash() - - assert.Equal(t, hash1, hash2) - }) - - t.Run("different hash for different LabelSet", func(t *testing.T) { - inputMetric1 := createMetricObject("some-event-name-1") - bucket := int64(28889820) - labelSet1 := NewLabelSet(inputMetric1, bucket) + tests := []struct { + name string + metric1 types.PUReportedMetric + metric2 types.PUReportedMetric + bucket1 int64 + bucket2 int64 + shouldHashesMatch bool + }{ + { + name: "same hash for same LabelSet for metrics", + metric1: createMetricObject(someEventName, ""), + metric2: createMetricObject(someEventName, ""), + bucket1: 28889820, + bucket2: 28889820, + shouldHashesMatch: true, + }, + { + name: "different hash for label set with different event name for metrics", + metric1: createMetricObject(someEventName, ""), + metric2: createMetricObject("some-event-name-2", ""), + bucket1: 28889820, + bucket2: 28889820, + shouldHashesMatch: false, + }, + { + name: "different hash for label set with different buckets for metrics", + metric1: createMetricObject(someEventName, ""), + metric2: createMetricObject(someEventName, ""), + bucket1: 28889000, + bucket2: 28889820, + shouldHashesMatch: false, + }, + { + name: "same hash for same LabelSet for errors", + metric1: createMetricObject(someEventName, "Some error message"), + metric2: createMetricObject(someEventName, "Some error message"), + bucket1: 28889820, + bucket2: 28889820, + shouldHashesMatch: true, + }, + { + name: "different hash for different LabelSet with different messages for errors", + metric1: createMetricObject(someEventName, "Some error message 1"), + metric2: createMetricObject(someEventName, "Some error message 2"), + bucket1: 28889820, + bucket2: 28889820, + shouldHashesMatch: false, + }, + } - inputMetric2 := createMetricObject("some-event-name-2") - labelSet2 := NewLabelSet(inputMetric2, bucket) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + labelSet1 := NewLabelSet(test.metric1, test.bucket1) + labelSet2 := NewLabelSet(test.metric2, test.bucket2) - hash1 := labelSet1.generateHash() - hash2 := labelSet2.generateHash() + hash1 := labelSet1.generateHash() + hash2 := labelSet2.generateHash() - assert.NotEqual(t, hash1, hash2) - }) + require.Equal(t, test.shouldHashesMatch, hash1 == hash2) + }) + } } diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 8fddbec0fe..983289e89f 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -108,7 +108,7 @@ func NewDefaultReporter(ctx context.Context, conf *config.Config, log logger.Log if eventSamplingEnabled.Load() { var err error - eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, conf, log) + eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerMetricsPathName, conf, log) if err != nil { panic(err) } @@ -239,7 +239,7 @@ func (r *DefaultReporter) getReports(currentMs, aggregationIntervalMin int64, sy return nil, 0, nil } - bucketStart, bucketEnd := r.getAggregationBucketMinute(queryMin.Int64, aggregationIntervalMin) + bucketStart, bucketEnd := getAggregationBucketMinute(queryMin.Int64, aggregationIntervalMin) // we don't want to flush partial buckets, so we wait for the current bucket to be complete if bucketEnd > currentMs { return nil, 0, nil @@ -318,7 +318,7 @@ func (r *DefaultReporter) getReports(currentMs, aggregationIntervalMin int64, sy func (r *DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []*types.Metric { metricsByGroup := map[string]*types.Metric{} maxReportsCountInARequest := r.maxReportsCountInARequest.Load() - sampleEventBucket, _ := r.getAggregationBucketMinute(reports[0].ReportedAt, int64(r.eventSamplingDuration.Load().Minutes())) + sampleEventBucket, _ := getAggregationBucketMinute(reports[0].ReportedAt, int64(r.eventSamplingDuration.Load().Minutes())) var values []*types.Metric reportIdentifier := func(report *types.ReportByStatus) string { @@ -393,30 +393,6 @@ func (r *DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) return values } -func (*DefaultReporter) getAggregationBucketMinute(timeMs, intervalMs int64) (int64, int64) { - // If interval is not a factor of 60, then the bucket start will not be aligned to hour start - // For example, if intervalMs is 7, and timeMs is 28891085 (6:05) then the bucket start will be 28891079 (5:59) - // and current bucket will contain the data of 2 different hourly buckets, which is should not have happened. - // To avoid this, we round the intervalMs to the nearest factor of 60. - if intervalMs <= 0 || 60%intervalMs != 0 { - factors := []int64{1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60} - closestFactor := factors[0] - for _, factor := range factors { - if factor < intervalMs { - closestFactor = factor - } else { - break - } - } - intervalMs = closestFactor - } - - bucketStart := timeMs - (timeMs % intervalMs) - bucketEnd := bucketStart + intervalMs - - return bucketStart, bucketEnd -} - func (r *DefaultReporter) emitLagMetric(ctx context.Context, c types.SyncerConfig, lastReportedAtTime *atomic.Time) error { // for monitoring reports pileups reportingLag := r.stats.NewTaggedStat( @@ -538,7 +514,7 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) { return err } // Use the same aggregationIntervalMin value that was used to query the reports in getReports() - bucketStart, bucketEnd := r.getAggregationBucketMinute(reportedAt, aggregationIntervalMin) + bucketStart, bucketEnd := getAggregationBucketMinute(reportedAt, aggregationIntervalMin) _, err = dbHandle.Exec(`DELETE FROM `+ReportsTable+` WHERE reported_at >= $1 and reported_at < $2`, bucketStart, bucketEnd) if err != nil { r.log.Errorf(`[ Reporting ]: Error deleting local reports from %s: %v`, ReportsTable, err) @@ -662,63 +638,6 @@ func (r *DefaultReporter) sendMetric(ctx context.Context, netClient *http.Client return err } -func isMetricPosted(status int) bool { - return status >= 200 && status < 300 -} - -func getPIIColumnsToExclude() []string { - piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", "sample_event,sample_response"), ",") - for i := range piiColumnsToExclude { - piiColumnsToExclude[i] = strings.Trim(piiColumnsToExclude[i], " ") - } - return piiColumnsToExclude -} - -func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) types.PUReportedMetric { - for _, col := range piiColumns { - switch col { - case "sample_event": - metric.StatusDetail.SampleEvent = []byte(`{}`) - case "sample_response": - metric.StatusDetail.SampleResponse = "" - case "event_name": - metric.StatusDetail.EventName = "" - case "event_type": - metric.StatusDetail.EventType = "" - } - } - - return metric -} - -func (r *DefaultReporter) transformMetricWithEventSampling(metric types.PUReportedMetric, reportedAt int64) (types.PUReportedMetric, error) { - if r.eventSampler == nil { - return metric, nil - } - - isValidSampleEvent := metric.StatusDetail.SampleEvent != nil && string(metric.StatusDetail.SampleEvent) != "{}" - - if isValidSampleEvent { - sampleEventBucket, _ := r.getAggregationBucketMinute(reportedAt, int64(r.eventSamplingDuration.Load().Minutes())) - hash := NewLabelSet(metric, sampleEventBucket).generateHash() - found, err := r.eventSampler.Get(hash) - if err != nil { - return metric, err - } - - if found { - metric.StatusDetail.SampleEvent = json.RawMessage(`{}`) - metric.StatusDetail.SampleResponse = "" - } else { - err := r.eventSampler.Put(hash) - if err != nil { - return metric, err - } - } - } - return metric, nil -} - func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error { if len(metrics) == 0 { return nil @@ -767,7 +686,7 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte } if r.eventSamplingEnabled.Load() { - metric, err = r.transformMetricWithEventSampling(metric, reportedAt) + metric, err = transformMetricWithEventSampling(metric, reportedAt, r.eventSampler, int64(r.eventSamplingDuration.Load().Minutes())) if err != nil { return err } diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 63464de5c6..6a2f60469c 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -2,19 +2,15 @@ package reporting import ( "context" - "fmt" "testing" - "time" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/stats" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -78,6 +74,25 @@ var _ = Describe("Reporting", func() { }) }) +func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) { + Expect(expectedMetric.ConnectionDetails.SourceID).To(Equal(actualMetric.ConnectionDetails.SourceID)) + Expect(expectedMetric.ConnectionDetails.DestinationID).To(Equal(actualMetric.ConnectionDetails.DestinationID)) + Expect(expectedMetric.ConnectionDetails.SourceJobID).To(Equal(actualMetric.ConnectionDetails.SourceJobID)) + Expect(expectedMetric.ConnectionDetails.SourceJobRunID).To(Equal(actualMetric.ConnectionDetails.SourceJobRunID)) + Expect(expectedMetric.ConnectionDetails.SourceTaskRunID).To(Equal(actualMetric.ConnectionDetails.SourceTaskRunID)) + Expect(expectedMetric.PUDetails.InPU).To(Equal(actualMetric.PUDetails.InPU)) + Expect(expectedMetric.PUDetails.PU).To(Equal(actualMetric.PUDetails.PU)) + Expect(expectedMetric.PUDetails.TerminalPU).To(Equal(actualMetric.PUDetails.TerminalPU)) + Expect(expectedMetric.PUDetails.InitialPU).To(Equal(actualMetric.PUDetails.InitialPU)) + Expect(expectedMetric.StatusDetail.Status).To(Equal(actualMetric.StatusDetail.Status)) + Expect(expectedMetric.StatusDetail.StatusCode).To(Equal(actualMetric.StatusDetail.StatusCode)) + Expect(expectedMetric.StatusDetail.Count).To(Equal(actualMetric.StatusDetail.Count)) + Expect(expectedMetric.StatusDetail.SampleResponse).To(Equal(actualMetric.StatusDetail.SampleResponse)) + Expect(expectedMetric.StatusDetail.SampleEvent).To(Equal(actualMetric.StatusDetail.SampleEvent)) + Expect(expectedMetric.StatusDetail.EventName).To(Equal(actualMetric.StatusDetail.EventName)) + Expect(expectedMetric.StatusDetail.EventType).To(Equal(actualMetric.StatusDetail.EventType)) +} + func TestGetAggregatedReports(t *testing.T) { inputReports := []*types.ReportByStatus{ { @@ -170,7 +185,7 @@ func TestGetAggregatedReports(t *testing.T) { t.Run("Should provide aggregated reports when batch size is 1", func(t *testing.T) { conf.Set("Reporting.maxReportsCountInARequest", 1) assert.Equal(t, 1, reportHandle.maxReportsCountInARequest.Load()) - bucket, _ := reportHandle.getAggregationBucketMinute(28017690, 10) + bucket, _ := getAggregationBucketMinute(28017690, 10) expectedResponse := []*types.Metric{ { InstanceDetails: types.InstanceDetails{ @@ -271,7 +286,7 @@ func TestGetAggregatedReports(t *testing.T) { t.Run("Should provide aggregated reports when batch size more than 1", func(t *testing.T) { conf.Set("Reporting.maxReportsCountInARequest", 10) assert.Equal(t, 10, reportHandle.maxReportsCountInARequest.Load()) - bucket, _ := reportHandle.getAggregationBucketMinute(28017690, 10) + bucket, _ := getAggregationBucketMinute(28017690, 10) expectedResponse := []*types.Metric{ { InstanceDetails: types.InstanceDetails{ @@ -351,7 +366,7 @@ func TestGetAggregatedReports(t *testing.T) { t.Run("Should provide aggregated reports when batch size is more than 1 and reports with same identifier are more then batch size", func(t *testing.T) { conf.Set("Reporting.maxReportsCountInARequest", 2) assert.Equal(t, 2, reportHandle.maxReportsCountInARequest.Load()) - bucket, _ := reportHandle.getAggregationBucketMinute(28017690, 10) + bucket, _ := getAggregationBucketMinute(28017690, 10) extraReport := &types.ReportByStatus{ InstanceDetails: types.InstanceDetails{ WorkspaceID: "some-workspace-id", @@ -486,707 +501,3 @@ func TestGetAggregatedReports(t *testing.T) { assert.Equal(t, expectedResponse, aggregatedMetrics) }) } - -func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) { - Expect(expectedMetric.ConnectionDetails.SourceID).To(Equal(actualMetric.ConnectionDetails.SourceID)) - Expect(expectedMetric.ConnectionDetails.DestinationID).To(Equal(actualMetric.ConnectionDetails.DestinationID)) - Expect(expectedMetric.ConnectionDetails.SourceJobID).To(Equal(actualMetric.ConnectionDetails.SourceJobID)) - Expect(expectedMetric.ConnectionDetails.SourceJobRunID).To(Equal(actualMetric.ConnectionDetails.SourceJobRunID)) - Expect(expectedMetric.ConnectionDetails.SourceTaskRunID).To(Equal(actualMetric.ConnectionDetails.SourceTaskRunID)) - Expect(expectedMetric.PUDetails.InPU).To(Equal(actualMetric.PUDetails.InPU)) - Expect(expectedMetric.PUDetails.PU).To(Equal(actualMetric.PUDetails.PU)) - Expect(expectedMetric.PUDetails.TerminalPU).To(Equal(actualMetric.PUDetails.TerminalPU)) - Expect(expectedMetric.PUDetails.InitialPU).To(Equal(actualMetric.PUDetails.InitialPU)) - Expect(expectedMetric.StatusDetail.Status).To(Equal(actualMetric.StatusDetail.Status)) - Expect(expectedMetric.StatusDetail.StatusCode).To(Equal(actualMetric.StatusDetail.StatusCode)) - Expect(expectedMetric.StatusDetail.Count).To(Equal(actualMetric.StatusDetail.Count)) - Expect(expectedMetric.StatusDetail.SampleResponse).To(Equal(actualMetric.StatusDetail.SampleResponse)) - Expect(expectedMetric.StatusDetail.SampleEvent).To(Equal(actualMetric.StatusDetail.SampleEvent)) - Expect(expectedMetric.StatusDetail.EventName).To(Equal(actualMetric.StatusDetail.EventName)) - Expect(expectedMetric.StatusDetail.EventType).To(Equal(actualMetric.StatusDetail.EventType)) -} - -type getValTc struct { - inputStr string - expected string -} - -var tcs = []getValTc{ - { - inputStr: `{"response":"{\"message\":\"Primary key 'Contact Key' does not exist.\",\"errorcode\":10000,\"documentation\":\"\"}"}`, - expected: "Primary key 'Contact Key' does not exist.", - }, - { - inputStr: `{"response":"Event Edit_Order_Button_Clicked doesn't match with Snapchat Events!","firstAttemptedAt":"2023-03-30T16:58:05.628Z","content-type":"application/json"}`, - expected: "Event Edit_Order_Button_Clicked doesn't match with Snapchat Events!", - }, - { - inputStr: `{"response":"{\"status\":400,\"message\":\"Failed with Unsupported post request. Object with ID '669556453669016' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api during response transformation\",\"destinationResponse\":{\"error\":{\"message\":\"Unsupported post request. Object with ID '669556453669016' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api\",\"type\":\"GraphMethodException\",\"code\":100,\"error_subcode\":33,\"fbtrace_id\":\"AAjsXHCiypjAV50Vg-dZx4D\"},\"status\":400},\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"FACEBOOK_PIXEL\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2EXmEugZiMykYfFzoPnXjq6bJ6D\",\"workspaceId\":\"1vTJeDNwZx4bJF7c5ZUWPqJpMVx\",\"context\":\"[Native Integration Service] Failure During Processor Transform\"}}","firstAttemptedAt":"2023-03-30T17:39:17.638Z","content-type":"application/json"}`, - expected: "Unsupported post request. Object with ID '669556453669016' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at https://developers.facebook.com/docs/graph-api", - }, - { - inputStr: `{"response":"{\n \"meta\": {\n \"error\": \"Unauthorized request\"\n }\n}\n","firstAttemptedAt":"2023-03-30T17:39:00.377Z","content-type":"application/json; charset=utf-8"}`, - expected: "Unauthorized request", - }, - { - inputStr: `TypeError: Cannot set property 'event_name' of undefined`, - expected: `TypeError: Cannot set property 'event_name' of undefined`, - }, - { - inputStr: `{"response":"{\"code\":400,\"error\":\"Invalid API key: 88ea3fd9e15f74491ac6dd401c8733c9\"}\r\r\r\n","firstAttemptedAt":"2023-03-30T17:39:36.755Z","content-type":"application/json"}`, - expected: `Invalid API key: 88ea3fd9e15f74491ac6dd401c8733c9`, - }, - { - inputStr: `{"response":"{\"status\":\"error\",\"message\":\"Invalid input JSON on line 1, column 54: Cannot deserialize value of type ` + "`" + `java.lang.String` + "`" + ` from Object value (token ` + "`" + `JsonToken.START_OBJECT` + "`" + `)\",\"correlationId\":\"c73c9759-e9fe-4061-8da6-3d7a10159f54\"}","firstAttemptedAt":"2023-03-30T17:39:12.397Z","content-type":"application/json;charset=utf-8"}`, - expected: `Invalid input JSON on line 1, column 54: Cannot deserialize value of type ` + "`" + `java.lang.String` + "`" + ` from Object value (token ` + "`" + `JsonToken.START_OBJECT` + "`" + `)`, - }, - { - inputStr: `{"response":"{\"error\":\"Event request failed (Invalid callback parameters)\"}","firstAttemptedAt":"2023-03-30T17:38:36.152Z","content-type":"application/json; charset=utf-8"}`, - expected: `Event request failed (Invalid callback parameters)`, - }, - { - inputStr: `{"response":"{\"type\":\"error.list\",\"request_id\":\"0000ib5themn9npuifdg\",\"errors\":[{\"code\":\"not_found\",\"message\":\"User Not Found\"}]}","firstAttemptedAt":"2023-03-30T17:38:47.857Z","content-type":"application/json; charset=utf-8"}`, - expected: `User Not Found`, - }, - { - inputStr: `{"response":"{\"type\":\"error.list\",\"request_id\":\"0000ohm9i1s3euaavmmg\",\"errors\":[{\"code\":\"unauthorized\",\"message\":\"Access Token Invalid\"}]}","firstAttemptedAt":"2023-03-30T16:59:37.973Z","content-type":"application/json; charset=utf-8"}`, - expected: `Access Token Invalid`, - }, - { - inputStr: `{"response":"[CDM GOOGLESHEETS] Unable to create client for 21xmYbMXCovrqQn5BIszNvcYmur circuit breaker is open, last error: [GoogleSheets] error :: [GoogleSheets] error :: error in GoogleSheets while unmarshalling credentials json:: invalid character 'v' looking for beginning of value","firstAttemptedAt":"2023-03-30T16:58:32.010Z","content-type":""}`, - expected: `[CDM GOOGLESHEETS] Unable to create client for 21xmYbMXCovrqQn5BIszNvcYmur circuit breaker is open, last error: [GoogleSheets] error :: [GoogleSheets] error :: error in GoogleSheets while unmarshalling credentials json:: invalid character 'v' looking for beginning of value`, - }, - { - inputStr: `{"response":"{\n \"meta\": {\n \"errors\": [\n \"id attribute and identifier are not the same value\",\n \"value for attribute 'token' cannot be longer than 1000 bytes\"\n ]\n }\n}\n","firstAttemptedAt":"2023-03-30T17:31:01.599Z","content-type":"application/json; charset=utf-8"}`, - expected: `id attribute and identifier are not the same value.value for attribute 'token' cannot be longer than 1000 bytes`, - }, - { - inputStr: `{"response":"{\"errors\":[\"No users with this external_id found\"]}","firstAttemptedAt":"2023-03-30T17:37:58.184Z","content-type":"application/json; charset=utf-8"}`, - expected: `No users with this external_id found`, - }, - { - inputStr: `{"response":"{\"status\":500,\"destinationResponse\":{\"response\":\"[ECONNRESET] :: Connection reset by peer\",\"status\":500,\"rudderJobMetadata\":{\"jobId\":2942260148,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"1qWeSkIiAhg3O96KzVVU5MieZHM\",\"destinationId\":\"2IpcvzcMgbLfXgGA4auDrtfEMuI\",\"workspaceId\":\"1nnEnc0tt7k9eFf7bz1GsiU0MFC\",\"secret\":null}},\"message\":\"[GA4 Response Handler] Request failed for destination ga4 with status: 500\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"retryable\",\"destType\":\"GA4\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2IpcvzcMgbLfXgGA4auDrtfEMuI\",\"workspaceId\":\"1nnEnc0tt7k9eFf7bz1GsiU0MFC\"}}","firstAttemptedAt":"2023-03-30T17:22:26.488Z","content-type":"application/json"}`, - expected: `[GA4 Response Handler] Request failed for destination ga4 with status: 500`, - }, - { - inputStr: `{"response":"{\"status\":502,\"destinationResponse\":{\"response\":\"\u003chtml\u003e\\r\\n\u003chead\u003e\u003ctitle\u003e502 Bad Gateway\u003c/title\u003e\u003c/head\u003e\\r\\n\u003cbody\u003e\\r\\n\u003ccenter\u003e\u003ch1\u003e502 Bad Gateway\u003c/h1\u003e\u003c/center\u003e\\r\\n\u003c/body\u003e\\r\\n\u003c/html\u003e\\r\\n\",\"status\":502,\"rudderJobMetadata\":{\"jobId\":4946423471,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"2CRIZp4OS10sjjGF3uF4guZuGmj\",\"destinationId\":\"2DstxLWX7Oi7gPdPM1CR9CikOko\",\"workspaceId\":\"1yaBlqltp5Y4V2NK8qePowlyafu\",\"secret\":null}},\"message\":\"Request failed with status: 502\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"retryable\",\"destType\":\"CLEVERTAP\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2DstxLWX7Oi7gPdPM1CR9CikOko\",\"workspaceId\":\"1yaBlqltp5Y4V2NK8qePowlyafu\"}}","firstAttemptedAt":"2023-03-30T17:11:55.326Z","content-type":"application/json"}`, - expected: "Request failed with status: 502", - }, - { - inputStr: `{"response":"{\"status\":400,\"destinationResponse\":{\"response\":\"\u003c!DOCTYPE html\u003e\\n\u003chtml lang=en\u003e\\n \u003cmeta charset=utf-8\u003e\\n \u003cmeta name=viewport content=\\\"initial-scale=1, minimum-scale=1, width=device-width\\\"\u003e\\n \u003ctitle\u003eError 400 (Bad Request)!!1\u003c/title\u003e\\n \u003cstyle\u003e\\n *{margin:0;padding:0}html,code{font:15px/22px arial,sans-serif}html{background:#fff;color:#222;padding:15px}body{margin:7% auto 0;max-width:390px;min-height:180px;padding:30px 0 15px}* \u003e body{background:url(//www.google.com/images/errors/robot.png) 100% 5px no-repeat;padding-right:205px}p{margin:11px 0 22px;overflow:hidden}ins{color:#777;text-decoration:none}a img{border:0}@media screen and (max-width:772px){body{background:none;margin-top:0;max-width:none;padding-right:0}}#logo{background:url(//www.google.com/images/branding/googlelogo/1x/googlelogo_color_150x54dp.png) no-repeat;margin-left:-5px}@media only screen and (min-resolution:192dpi){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) no-repeat 0% 0%/100% 100%;-moz-border-image:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) 0}}@media only screen and (-webkit-min-device-pixel-ratio:2){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) no-repeat;-webkit-background-size:100% 100%}}#logo{display:inline-block;height:54px;width:150px}\\n \u003c/style\u003e\\n \u003ca href=//www.google.com/\u003e\u003cspan id=logo aria-label=Google\u003e\u003c/span\u003e\u003c/a\u003e\\n \u003cp\u003e\u003cb\u003e400.\u003c/b\u003e \u003cins\u003eThat’s an error.\u003c/ins\u003e\\n \u003cp\u003eYour client has issued a malformed or illegal request. \u003cins\u003eThat’s all we know.\u003c/ins\u003e\\n\",\"status\":400,\"rudderJobMetadata\":{\"jobId\":597438246,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"2E2b60bs1ybmIKuGvaVxtLT5lYo\",\"destinationId\":\"2E5xYQkj5OVrA3CWexvRLV4b7RH\",\"workspaceId\":\"1sUXvPs0hYgjBxSfjG4gqnRFNoP\",\"secret\":null}},\"message\":\"[GA4 Response Handler] Request failed for destination ga4 with status: 400\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"GA4\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2E5xYQkj5OVrA3CWexvRLV4b7RH\",\"workspaceId\":\"1sUXvPs0hYgjBxSfjG4gqnRFNoP\"}}","firstAttemptedAt":"2023-03-30T17:11:25.524Z","content-type":"application/json"}`, - expected: "[GA4 Response Handler] Request failed for destination ga4 with status: 400", - }, - { - inputStr: `{"response":"[GoogleSheets] error :: Failed to insert Payload :: This action would increase the number of cells in the workbook above the limit of 10000000 cells.","firstAttemptedAt":"2023-03-30T17:14:18.737Z","content-type":""}`, - expected: `[GoogleSheets] error :: Failed to insert Payload :: This action would increase the number of cells in the workbook above the limit of 10000000 cells.`, - }, - { - inputStr: `{"response":"{\"msg\":\"Project 9434: The request would increase the number of object fields to 19434 by adding [Application: Invoke Use Positions By Category Mutation, Application: Invoke Use Positions By Category Mutation.request, Application: Invoke Use Positions By Category Mutation.request.cacheID, Application: Invoke Use Positions By Category Mutation.request.metadata, Application: Invoke Use Positions By Category Mutation.request.name, Application: Invoke Use Positions By Category Mutation.request.operationKind, Application: Invoke Use Positions By Category Mutation.request.text, Application: Invoke Use Positions By Category Mutation.variables, Application: Invoke Use Positions By Category Mutation.variables.input, Application: Invoke Use Positions By Category Mutation.variables.input.gigPositionId, Application: Invoke Use Positions By Category Mutation.variables.input.status, Application: Invoke Use Positions By Category Mutation.variables.input.workerId, Application: Pressed Action Row, Application: Pressed Checkbox], which exceeds the limit of 15000\",\"code\":\"UniqueFieldsLimitExceeded\",\"params\":{\"limit\":15000,\"requestedTotal\":19434,\"newFields\":[\"Application: Invoke Use Positions By Category Mutation\",\"Application: Invoke Use Positions By Category Mutation.request\",\"Application: Invoke Use Positions By Category Mutation.request.cacheID\",\"Application: Invoke Use Positions By Category Mutation.request.metadata\",\"Application: Invoke Use Positions By Category Mutation.request.name\",\"Application: Invoke Use Positions By Category Mutation.request.operationKind\",\"Application: Invoke Use Positions By Category Mutation.request.text\",\"Application: Invoke Use Positions By Category Mutation.variables\",\"Application: Invoke Use Positions By Category Mutation.variables.input\",\"Application: Invoke Use Positions By Category Mutation.variables.input.gigPositionId\",\"Application: Invoke Use Positions By Category Mutation.variables.input.status\",\"Application: Invoke Use Positions By Category Mutation.variables.input.workerId\",\"Application: Pressed Action Row\",\"Application: Pressed Checkbox\"]}}","firstAttemptedAt":"2023-03-30T17:39:32.189Z","content-type":"application/json"}`, - expected: `Project 9434: The request would increase the number of object fields to 19434 by adding [Application: Invoke Use Positions By Category Mutation, Application: Invoke Use Positions By Category Mutation.request, Application: Invoke Use Positions By Category Mutation.request.cacheID, Application: Invoke Use Positions By Category Mutation.request.metadata, Application: Invoke Use Positions By Category Mutation.request.name, Application: Invoke Use Positions By Category Mutation.request.operationKind, Application: Invoke Use Positions By Category Mutation.request.text, Application: Invoke Use Positions By Category Mutation.variables, Application: Invoke Use Positions By Category Mutation.variables.input, Application: Invoke Use Positions By Category Mutation.variables.input.gigPositionId, Application: Invoke Use Positions By Category Mutation.variables.input.status, Application: Invoke Use Positions By Category Mutation.variables.input.workerId, Application: Pressed Action Row, Application: Pressed Checkbox], which exceeds the limit of 15000`, - }, - { - inputStr: `{"response":"{\"status\":400,\"destinationResponse\":\"\",\"message\":\"Unable to find conversionActionId for conversion:Order Completed\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"meta\":\"instrumentation\",\"destType\":\"GOOGLE_ADWORDS_ENHANCED_CONVERSIONS\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2DiY9INMJbBzfCdjgRlvYzsUl57\",\"workspaceId\":\"1zffyLlFcWBMmv4vvtYldWxEdGg\"}}","firstAttemptedAt":"2023-03-30T17:32:47.268Z","content-type":"application/json"}`, - expected: `Unable to find conversionActionId for conversion:Order Completed`, - }, - { - inputStr: `{"response":"{\"status\":400,\"destinationResponse\":{\"response\":[{\"duplicateResut\":{\"allowSave\":true,\"duplicateRule\":\"Contacts_DR\",\"duplicateRuleEntityType\":\"Contact\",\"errorMessage\":\"You're creating a duplicate record. We recommend you use an existing record instead.\",\"matchResults\":[{\"entityType\":\"Contact\",\"errors\":[],\"matchEngine\":\"ExactMatchEngine\",\"matchRecords\":[{\"additionalInformation\":[],\"fieldDiffs\":[],\"matchConfidence\":100,\"record\":{\"attributes\":{\"type\":\"Contact\",\"url\":\"/services/data/v50.0/sobjects/Contact/0031i000013x2TEAAY\"},\"Id\":\"0031i000013x2TEAAY\"}}],\"rule\":\"Contact_MR\",\"size\":1,\"success\":true}]},\"errorCode\":\"DUPLICATES_DETECTED\",\"message\":\"You're creating a duplicate record. We recommend you use an existing record instead.\"}],\"status\":400,\"rudderJobMetadata\":{\"jobId\":1466739020,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"2JF2LaBUedeOfAyt9EoIVJylkKS\",\"destinationId\":\"2JJDsqHbkuIZ89ldOBMaBjPi9L7\",\"workspaceId\":\"26TTcz2tQucRs2xZiGThQzGRk2l\",\"secret\":null,\"destInfo\":{\"authKey\":\"2JJDsqHbkuIZ89ldOBMaBjPi9L7\"}}},\"message\":\"Salesforce Request Failed: \\\"400\\\" due to \\\"You're creating a duplicate record. We recommend you use an existing record instead.\\\", (Aborted) during Salesforce Response Handling\",\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"SALESFORCE\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"2JJDsqHbkuIZ89ldOBMaBjPi9L7\",\"workspaceId\":\"26TTcz2tQucRs2xZiGThQzGRk2l\"}}","firstAttemptedAt":"2023-03-30T17:07:52.359Z","content-type":"application/json"}`, - expected: `You're creating a duplicate record. We recommend you use an existing record instead.`, - }, - { - inputStr: `{"response":"\u003c!--\n ~ Copyright (C) 2010-2021 Evergage, Inc.\n ~ All rights reserved.\n --\u003e\n\n\u003c!DOCTYPE html\u003e\n\u003chtml lang=\"en\"\u003e\n\u003chead\u003e\n \u003cmeta charset=\"UTF-8\"\u003e\n \u003ctitle\u003eSalesforce Personalization\u003c/title\u003e\n \u003clink rel=\"icon\" type=\"image/x-icon\" href=\"https://www.salesforce.com/etc/designs/sfdc-www/en_us/favicon.ico\"/\u003e\n \u003clink rel=\"shortcut icon\" type=\"image/x-icon\" href=\"https://www.salesforce.com/etc/designs/sfdc-www/en_us/favicon.ico\"/\u003e\n \u003cstyle\u003e\n body { font-family: Salesforce Sans,Arial,sans-serif; text-align: center; padding: 50px; background-color:#fff; }\n h1 { font-size: 1.25rem; color: #080707; text-align: center; margin-top: -0.5rem; }\n p { font-size: 0.8125rem; color: #3E3E3C; text-align:center; }\n \u003c/style\u003e\n\u003c/head\u003e\n\u003cbody\u003e\n \u003cdiv align=”center”\u003e\n \u003cimg src=\"/PageNotAvailable.svg\" /\u003e\n \u003c/div\u003e\n \u003cdiv align=”center”\u003e\n \u003ch1\u003eThe page you want isn’t available.\u003c/h1\u003e\n \u003cp\u003eTo find the page you want, use the main navigation.\u003c/p\u003e\n \u003c/div\u003e\n\u003c/body\u003e\n\u003c/html\u003e","firstAttemptedAt":"2024-09-02T06:57:13.829Z","content-type":"text/html"}`, - expected: "The page you want isn’t available.\r\n\r\n To find the page you want, use the main navigation.\r\n\r\n ", - }, - { - inputStr: `{"response":"{\"status\":\"fail\",\"processed\":0,\"unprocessed\":[{\"status\":\"fail\",\"code\":513,\"error\":\"Event Name is incorrect. ErrorCode: 513 - Trying to raise a restricted system event. Skipped record number : 1\",\"record\":{\"evtData\":{\"initial_referrer\":\"https://www.google.com/\",\"initial_referring_domain\":\"www.google.com\",\"path\":\"/busca/\",\"referrer\":\"https://www.extrabom.com.br/busca/?q=Bombril\u0026anonymous=347f65ea66096fd7db4e1bd88211a83dfbe263b78da6a5de0261d160c54100ba\",\"referring_domain\":\"www.extrabom.com.br\",\"search\":\"?q=X14\u0026anonymous=347f65ea66096fd7db4e1bd88211a83dfbe263b78da6a5de0261d160c54100ba\",\"tab_url\":\"https://www.extrabom.com.br/busca/?q=X14\u0026anonymous=347f65ea66096fd7db4e1bd88211a83dfbe263b78da6a5de0261d160c54100ba\",\"title\":\"X14 - Busca - Extrabom\",\"url\":\"https://www.extrabom.com.br/busca/?q=X14\"},\"evtName\":\"Web Page Viewed\",\"identity\":\"69298\",\"type\":\"event\"}}]}","firstAttemptedAt":"2024-09-02T00:40:06.451Z","content-type":"application/json"}`, - expected: "Event Name is incorrect. ErrorCode: 513 - Trying to raise a restricted system event. Skipped record number : 1", - }, - { - inputStr: `{"response":"{\"destinationResponse\":\"\u003c!DOCTYPE html\u003e\\n\u003chtml lang=\\\"en\\\" id=\\\"facebook\\\"\u003e\\n \u003chead\u003e\\n \u003ctitle\u003eFacebook | Error\u003c/title\u003e\\n \u003cmeta charset=\\\"utf-8\\\"\u003e\\n \u003cmeta http-equiv=\\\"cache-control\\\" content=\\\"no-cache\\\"\u003e\\n \u003cmeta http-equiv=\\\"cache-control\\\" content=\\\"no-store\\\"\u003e\\n \u003cmeta http-equiv=\\\"cache-control\\\" content=\\\"max-age=0\\\"\u003e\\n \u003cmeta http-equiv=\\\"expires\\\" content=\\\"-1\\\"\u003e\\n \u003cmeta http-equiv=\\\"pragma\\\" content=\\\"no-cache\\\"\u003e\\n \u003cmeta name=\\\"robots\\\" content=\\\"noindex,nofollow\\\"\u003e\\n \u003cstyle\u003e\\n html, body {\\n color: #141823;\\n background-color: #e9eaed;\\n font-family: Helvetica, Lucida Grande, Arial,\\n Tahoma, Verdana, sans-serif;\\n margin: 0;\\n padding: 0;\\n text-align: center;\\n }\\n\\n #header {\\n height: 30px;\\n padding-bottom: 10px;\\n padding-top: 10px;\\n text-align: center;\\n }\\n\\n #icon {\\n width: 30px;\\n }\\n\\n h1 {\\n font-size: 18px;\\n }\\n\\n p {\\n font-size: 13px;\\n }\\n\\n #footer {\\n border-top: 1px solid #ddd;\\n color: #9197a3;\\n font-size: 12px;\\n padding: 5px 8px 6px 0;\\n }\\n \u003c/style\u003e\\n \u003c/head\u003e\\n \u003cbody\u003e\\n \u003cdiv id=\\\"header\\\"\u003e\\n \u003ca href=\\\"//www.facebook.com/\\\"\u003e\\n \u003cimg id=\\\"icon\\\" src=\\\"//static.facebook.com/images/logos/facebook_2x.png\\\" /\u003e\\n \u003c/a\u003e\\n \u003c/div\u003e\\n \u003cdiv id=\\\"core\\\"\u003e\\n \u003ch1 id=\\\"sorry\\\"\u003eSorry, something went wrong.\u003c/h1\u003e\\n \u003cp id=\\\"promise\\\"\u003e\\n We're working on it and we'll get it fixed as soon as we can.\\n \u003c/p\u003e\\n \u003cp id=\\\"back-link\\\"\u003e\\n \u003ca id=\\\"back\\\" href=\\\"//www.facebook.com/\\\"\u003eGo Back\u003c/a\u003e\\n \u003c/p\u003e\\n \u003cdiv id=\\\"footer\\\"\u003e\\n Facebook\\n \u003cspan id=\\\"copyright\\\"\u003e\\n \u0026copy; 2022\\n \u003c/span\u003e\\n \u003cspan id=\\\"help-link\\\"\u003e\\n \u0026#183;\\n \u003ca id=\\\"help\\\" href=\\\"//www.facebook.com/help/\\\"\u003eHelp Center\u003c/a\u003e\\n \u003c/span\u003e\\n \u003c/div\u003e\\n \u003c/div\u003e\\n \u003cscript\u003e\\n document.getElementById('back').onclick = function() {\\n if (history.length \u003e 1) {\\n history.back();\\n return false;\\n }\\n };\\n\\n // Adjust the display based on the window size\\n if (window.innerHeight \u003c 80 || window.innerWidth \u003c 80) {\\n // Blank if window is too small\\n document.body.style.display = 'none';\\n };\\n if (window.innerWidth \u003c 200 || window.innerHeight \u003c 150) {\\n document.getElementById('back-link').style.display = 'none';\\n document.getElementById('help-link').style.display = 'none';\\n };\\n if (window.innerWidth \u003c 200) {\\n document.getElementById('sorry').style.fontSize = '16px';\\n };\\n if (window.innerWidth \u003c 150) {\\n document.getElementById('promise').style.display = 'none';\\n };\\n if (window.innerHeight \u003c 150) {\\n document.getElementById('sorry').style.margin = '4px 0 0 0';\\n document.getElementById('sorry').style.fontSize = '14px';\\n document.getElementById('promise').style.display = 'none';\\n };\\n \u003c/script\u003e\\n \u003c/body\u003e\\n\u003c/html\u003e\\n\",\"message\":\"Request Processed Successfully\",\"status\":502}","firstAttemptedAt":"2023-03-30T17:31:41.884Z","content-type":"application/json"}`, - expected: "Request Processed Successfully", - }, - { - inputStr: `{"response":"{\"status\":402,\"message\":\"[Generic Response Handler] Request failed for destination active_campaign with status: 402\",\"destinationResponse\":{\"response\":\"\",\"status\":402,\"rudderJobMetadata\":{\"jobId\":38065100,\"attemptNum\":0,\"userId\":\"\",\"sourceId\":\"1tzHYMqZp5Cogqe4EeagdfpM7Y3\",\"destinationId\":\"1uFSe3P8gJnacuYQIhWFhRfvCGQ\",\"workspaceId\":\"1tyLn8D94vS107gQwxsmhZaFfP2\",\"secret\":null}},\"statTags\":{\"errorCategory\":\"network\",\"errorType\":\"aborted\",\"destType\":\"ACTIVE_CAMPAIGN\",\"module\":\"destination\",\"implementation\":\"native\",\"feature\":\"dataDelivery\",\"destinationId\":\"1uFSe3P8gJnacuYQIhWFhRfvCGQ\",\"workspaceId\":\"1tyLn8D94vS107gQwxsmhZaFfP2\",\"context\":\"[Native Integration Service] Failure During Processor Transform\"}}","firstAttemptedAt":"2023-03-30T17:03:40.883Z","content-type":"application/json"}`, - expected: "[Generic Response Handler] Request failed for destination active_campaign with status: 402", - }, - { - inputStr: `{"response":"{\"message\":\"Valid data must be provided in the 'attributes', 'events', or 'purchases' fields.\",\"errors\":[{\"type\":\"'external_id' or 'braze_id' or 'user_alias' is required\",\"input_array\":\"attributes\",\"index\":0}]}","firstAttemptedAt":"2023-03-30T17:26:36.847Z","content-type":"application/json"}`, - expected: "Valid data must be provided in the 'attributes', 'events', or 'purchases' fields.", - }, - { - inputStr: `{"fetching_remote_schema_failed":{"attempt":6,"errors":["dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused","dial tcp 159.223.171.199:38649: connect: connection refused"]}}`, - expected: `dial tcp 159.223.171.199:38649: connect: connection refused`, - }, - { - inputStr: `{"exporting_data_failed":{"attempt":5,"errors":["2 errors occurred:\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.","2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes."]}}`, - expected: "2 errors occurred:\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout.2 errors occurred:\n1 errors occurred:\nupdate schema: adding columns to warehouse: failed to add columns for table logout in namespace raw_gtm of destination RS:2MeRrhS670OOhZv6gBezLipeVtm with error: pq: must be owner of relation logout\n1 errors occurred:\nloading identifies table: inserting into original table: pq: Value out of range for 4 bytes.", - }, - { - inputStr: `{"response":"{\n\t\t\t\tError: invalid character 'P' looking for beginning of value,\n\t\t\t\t(trRespStCd, trRespBody): (504, Post \"http://transformer.rudder-us-east-1b-blue/v0/destinations/google_adwords_enhanced_conversions/proxy\": context deadline exceeded (Client.Timeout exceeded while awaiting headers)),\n\t\t\t}","firstAttemptedAt":"2023-03-30T17:24:58.068Z","content-type":"text/plain; charset=utf-8"}`, - expected: "{\n\t\t\t\tError: invalid character 'P' looking for beginning of value,\n\t\t\t\t(trRespStCd, trRespBody): (504, Post \"http://transformer.rudder-us-east-1b-blue/v0/destinations/google_adwords_enhanced_conversions/proxy\": context deadline exceeded (Client.Timeout exceeded while awaiting headers)),\n\t\t\t}", - }, - { - inputStr: `{"error":"{\"message\":\"some random message\",\"destinationResponse\":{\"error\":{\"message\":\"Unhandled random error\",\"type\":\"RandomException\",\"code\":5,\"error_subcode\":12,\"fbtrace_id\":\"facebook_px_trace_id_10\"},\"status\":412}}","firstAttemptedAt":"2023-04-20T17:24:58.068Z","content-type":"text/plain; charset=utf-8"}`, - expected: "Unhandled random error", - }, - { - inputStr: `{"error":"unknown error occurred","firstAttemptedAt":"2023-04-21T17:24:58.068Z","content-type":"text/plain; charset=utf-8"}`, - expected: "unknown error occurred", - }, -} - -func TestGetErrorMessageFromResponse(t *testing.T) { - ext := NewErrorDetailExtractor(logger.NOP) - - for i, tc := range tcs { - t.Run(fmt.Sprintf("payload-%v", i), func(t *testing.T) { - msg := ext.GetErrorMessage(tc.inputStr) - require.Equal(t, tc.expected, msg) - }) - } -} - -func TestExtractErrorDetails(t *testing.T) { - type depTcOutput struct { - errorMsg string - errorCode string - } - type depTc struct { - caseDescription string - inputErrMsg string - output depTcOutput - statTags map[string]string - } - testCases := []depTc{ - { - caseDescription: "should validate the deprecation correctly", - inputErrMsg: "Offline Conversions API is deprecated from onwards. Please use Conversions API, which is the latest version that supports Offline Conversions API and can be used until.", - output: depTcOutput{ - errorMsg: "Offline Conversions API is deprecated from onwards Please use Conversions API which is the latest version that supports Offline Conversions API and can be used until ", - errorCode: "deprecation", - }, - }, - { - caseDescription: "should validate the deprecation correctly even though we have upper-case keywords", - inputErrMsg: "Offline Conversions API is DeprEcated from onwards. Please use Conversions API, which is the latest version that supports Offline Conversions API and can be used until.", - output: depTcOutput{ - errorMsg: "Offline Conversions API is DeprEcated from onwards Please use Conversions API which is the latest version that supports Offline Conversions API and can be used until ", - errorCode: "deprecation", - }, - }, - { - caseDescription: "should use statTags to compute errorCode", - statTags: map[string]string{ - "errorCategory": "dataValidation", - "errorType": "configuration", - }, - inputErrMsg: "Some error", - output: depTcOutput{ - errorMsg: "Some error", - errorCode: "dataValidation:configuration", - }, - }, - } - - edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP, config.Default) - for _, tc := range testCases { - t.Run(tc.caseDescription, func(t *testing.T) { - errorDetails := edr.extractErrorDetails(tc.inputErrMsg, tc.statTags) - - require.Equal(t, tc.output.errorMsg, errorDetails.ErrorMessage) - require.Equal(t, tc.output.errorCode, errorDetails.ErrorCode) - }) - } -} - -func TestCleanUpErrorMessage(t *testing.T) { - ext := NewErrorDetailExtractor(logger.NOP) - type testCase struct { - inputStr string - expected string - } - - testCases := []testCase{ - {inputStr: "Object with ID '123983489734' is not a valid object", expected: "Object with ID is not a valid object"}, - {inputStr: "http://xyz-rudder.com/v1/endpoint not reachable: context deadline exceeded", expected: " not reachable context deadline exceeded"}, - {inputStr: "http://xyz-rudder.com/v1/endpoint not reachable 172.22.22.10: EOF", expected: " not reachable EOF"}, - {inputStr: "Request failed to process from 16-12-2022:19:30:23T+05:30 due to internal server error", expected: "Request failed to process from due to internal server error"}, - {inputStr: "User with email 'vagor12@bing.com' is not valid", expected: "User with email is not valid"}, - {inputStr: "Allowed timestamp is [15 minutes] into the future", expected: "Allowed timestamp is minutes into the future"}, - } - for i, tCase := range testCases { - t.Run(fmt.Sprintf("Case-%d", i), func(t *testing.T) { - actual := ext.CleanUpErrorMessage(tCase.inputStr) - require.Equal(t, tCase.expected, actual) - }) - } -} - -func BenchmarkJsonNestedSearch(b *testing.B) { - extractor := NewErrorDetailExtractor(logger.NOP) - - b.Run("JsonNested used fn", func(b *testing.B) { - for i := 0; i < len(tcs); i++ { - extractor.GetErrorMessage(tcs[i].inputStr) - } - }) -} - -func TestAggregationLogic(t *testing.T) { - dbErrs := []*types.EDReportsDB{ - { - PU: "dest_transformer", - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: "wsp1", - InstanceID: "instance-1", - Namespace: "nmspc", - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: "src-1", - SourceDefinitionId: "src-def-1", - DestinationDefinitionId: "des-def-1", - DestinationID: "des-1", - DestType: "DES_1", - }, - EDErrorDetails: types.EDErrorDetails{ - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: 200, - ErrorCode: "", - ErrorMessage: "", - EventType: "identify", - }, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: 124335445, - }, - Count: 10, - }, - { - PU: "dest_transformer", - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: "wsp1", - InstanceID: "instance-1", - Namespace: "nmspc", - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: "src-1", - SourceDefinitionId: "src-def-1", - DestinationDefinitionId: "des-def-1", - DestinationID: "des-1", - DestType: "DES_1", - }, - EDErrorDetails: types.EDErrorDetails{ - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: 400, - ErrorCode: "", - ErrorMessage: "bad data sent for transformation", - EventType: "identify", - }, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: 124335445, - }, - Count: 5, - }, - { - PU: "dest_transformer", - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: "wsp1", - InstanceID: "instance-1", - Namespace: "nmspc", - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: "src-1", - SourceDefinitionId: "src-def-1", - DestinationDefinitionId: "des-def-1", - DestinationID: "des-1", - DestType: "DES_1", - }, - EDErrorDetails: types.EDErrorDetails{ - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: 400, - ErrorCode: "", - ErrorMessage: "bad data sent for transformation", - EventType: "identify", - }, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: 124335445, - }, - Count: 15, - }, - { - PU: "dest_transformer", - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: "wsp1", - InstanceID: "instance-1", - Namespace: "nmspc", - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: "src-1", - SourceDefinitionId: "src-def-1", - DestinationDefinitionId: "des-def-1", - DestinationID: "des-1", - DestType: "DES_1", - }, - EDErrorDetails: types.EDErrorDetails{ - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: 400, - ErrorCode: "", - ErrorMessage: "user_id information missing", - EventType: "identify", - }, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: 124335446, - }, - Count: 20, - }, - // error occurred at router level(assume this is batching enabled) - { - PU: "router", - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: "wsp1", - InstanceID: "instance-1", - Namespace: "nmspc", - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: "src-1", - SourceDefinitionId: "src-def-1", - DestinationDefinitionId: "des-def-1", - DestinationID: "des-1", - DestType: "DES_1", - }, - EDErrorDetails: types.EDErrorDetails{ - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: 500, - ErrorCode: "", - ErrorMessage: "Cannot read type property of undefined", // some error during batching - EventType: "identify", - }, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: 124335446, - }, - Count: 15, - }, - } - configSubscriber := newConfigSubscriber(logger.NOP) - ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP, config.Default) - reportingMetrics := ed.aggregate(dbErrs) - - reportResults := []*types.EDMetric{ - { - PU: dbErrs[0].PU, - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: dbErrs[0].WorkspaceID, - InstanceID: dbErrs[0].InstanceID, - Namespace: dbErrs[0].Namespace, - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: dbErrs[0].SourceID, - SourceDefinitionId: dbErrs[0].SourceDefinitionId, - DestinationDefinitionId: dbErrs[0].DestinationDefinitionId, - DestinationID: dbErrs[0].DestinationID, - DestType: dbErrs[0].DestType, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: dbErrs[0].ReportedAt * 60 * 1000, - }, - Errors: []types.EDErrorDetails{ - { - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: dbErrs[0].StatusCode, - ErrorCode: dbErrs[0].ErrorCode, - ErrorMessage: dbErrs[0].ErrorMessage, - EventType: dbErrs[0].EventType, - }, - ErrorCount: 10, - }, - { - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: dbErrs[1].StatusCode, - ErrorCode: dbErrs[1].ErrorCode, - ErrorMessage: dbErrs[1].ErrorMessage, - EventType: dbErrs[1].EventType, - }, - ErrorCount: 20, - }, - }, - }, - { - PU: dbErrs[3].PU, - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: dbErrs[3].WorkspaceID, - InstanceID: dbErrs[3].InstanceID, - Namespace: dbErrs[3].Namespace, - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: dbErrs[3].SourceID, - SourceDefinitionId: dbErrs[3].SourceDefinitionId, - DestinationDefinitionId: dbErrs[3].DestinationDefinitionId, - DestinationID: dbErrs[3].DestinationID, - DestType: dbErrs[3].DestType, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: dbErrs[3].ReportedAt * 60 * 1000, - }, - Errors: []types.EDErrorDetails{ - { - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: dbErrs[3].StatusCode, - ErrorCode: dbErrs[3].ErrorCode, - ErrorMessage: dbErrs[3].ErrorMessage, - EventType: dbErrs[3].EventType, - }, - ErrorCount: 20, - }, - }, - }, - { - PU: dbErrs[4].PU, - EDInstanceDetails: types.EDInstanceDetails{ - WorkspaceID: dbErrs[4].WorkspaceID, - InstanceID: dbErrs[4].InstanceID, - Namespace: dbErrs[4].Namespace, - }, - EDConnectionDetails: types.EDConnectionDetails{ - SourceID: dbErrs[4].SourceID, - SourceDefinitionId: dbErrs[4].SourceDefinitionId, - DestinationDefinitionId: dbErrs[4].DestinationDefinitionId, - DestinationID: dbErrs[4].DestinationID, - DestType: dbErrs[4].DestType, - }, - ReportMetadata: types.ReportMetadata{ - ReportedAt: dbErrs[4].ReportedAt * 60 * 1000, - }, - Errors: []types.EDErrorDetails{ - { - EDErrorDetailsKey: types.EDErrorDetailsKey{ - StatusCode: dbErrs[4].StatusCode, - ErrorCode: dbErrs[4].ErrorCode, - ErrorMessage: dbErrs[4].ErrorMessage, - EventType: dbErrs[4].EventType, - }, - ErrorCount: 15, - }, - }, - }, - } - - require.Equal(t, reportResults, reportingMetrics) -} - -func TestGetAggregationBucket(t *testing.T) { - conf := config.New() - configSubscriber := newConfigSubscriber(logger.NOP) - reportHandle := NewDefaultReporter(context.Background(), conf, logger.NOP, configSubscriber, stats.NOP) - t.Run("should return the correct aggregation bucket with default interval of 1 mintue", func(t *testing.T) { - cases := []struct { - reportedAt int64 - bucketStart int64 - bucketEnd int64 - }{ - { - reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 1, 1, 10, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 10, 6, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 2, 4, 11, 5, 59, 10, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 2, 4, 11, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 2, 4, 11, 6, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 3, 5, 12, 59, 59, 59, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 3, 5, 12, 59, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 13, 1, 0, 0, time.UTC).Unix() / 60, - }, - } - - for _, c := range cases { - bs, be := reportHandle.getAggregationBucketMinute(c.reportedAt, 1) - require.Equal(t, c.bucketStart, bs) - require.Equal(t, c.bucketEnd, be) - } - }) - - t.Run("should return the correct aggregation bucket with aggregation interval of 5 mintue", func(t *testing.T) { - cases := []struct { - reportedAt int64 - bucketStart int64 - bucketEnd int64 - }{ - { - reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 1, 1, 10, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 10, 10, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 2, 4, 11, 5, 59, 10, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 2, 4, 11, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 2, 4, 11, 10, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 7, 30, 11, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 13, 10, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 8, 50, 30, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 13, 10, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 9, 5, 15, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 13, 10, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 3, 5, 12, 55, 53, 1, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 3, 5, 12, 55, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 3, 5, 12, 57, 53, 1, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 3, 5, 12, 55, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 3, 5, 12, 59, 59, 59, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 3, 5, 12, 55, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, - }, - } - - for _, c := range cases { - bs, be := reportHandle.getAggregationBucketMinute(c.reportedAt, 5) - require.Equal(t, c.bucketStart, bs) - require.Equal(t, c.bucketEnd, be) - } - }) - - t.Run("should return the correct aggregation bucket with aggregation interval of 15 mintue", func(t *testing.T) { - cases := []struct { - reportedAt int64 - bucketStart int64 - bucketEnd int64 - }{ - { - reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 1, 1, 10, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 10, 15, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 2, 4, 11, 17, 59, 10, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 2, 4, 11, 15, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 2, 4, 11, 30, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 39, 10, 59, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 30, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 13, 45, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 4, 6, 13, 59, 50, 30, time.UTC).Unix() / 60, - bucketStart: time.Date(2022, 4, 6, 13, 45, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 4, 6, 14, 0, 0, 0, time.UTC).Unix() / 60, - }, - } - - for _, c := range cases { - bs, be := reportHandle.getAggregationBucketMinute(c.reportedAt, 15) - require.Equal(t, c.bucketStart, bs) - require.Equal(t, c.bucketEnd, be) - } - }) - - t.Run("should choose closest factor of 60 if interval is non positive and return the correct aggregation bucket", func(t *testing.T) { - cases := []struct { - reportedAt int64 - interval int64 - bucketStart int64 - bucketEnd int64 - }{ - { - reportedAt: time.Date(2022, 1, 1, 12, 5, 10, 40, time.UTC).Unix() / 60, - interval: -1, // it should round to 1 - bucketStart: time.Date(2022, 1, 1, 12, 5, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 12, 6, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 2, 29, 10, 0, 2, 59, time.UTC).Unix() / 60, - interval: -1, // it should round to 1 - bucketStart: time.Date(2022, 2, 29, 10, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 2, 29, 10, 1, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 2, 10, 0, 0, 0, 40, time.UTC).Unix() / 60, - interval: 0, // it should round to 1 - bucketStart: time.Date(2022, 2, 10, 0, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 2, 10, 0, 1, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 11, 27, 23, 59, 59, 40, time.UTC).Unix() / 60, - interval: 0, // it should round to 1 - bucketStart: time.Date(2022, 11, 27, 23, 59, 59, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 11, 28, 0, 0, 0, 0, time.UTC).Unix() / 60, - }, - } - - for _, c := range cases { - bs, be := reportHandle.getAggregationBucketMinute(c.reportedAt, c.interval) - require.Equal(t, c.bucketStart, bs) - require.Equal(t, c.bucketEnd, be) - } - }) - - t.Run("should choose closest factor of 60 if interval is not a factor of 60 and return the correct aggregation bucket", func(t *testing.T) { - cases := []struct { - reportedAt int64 - interval int64 - bucketStart int64 - bucketEnd int64 - }{ - { - reportedAt: time.Date(2022, 1, 1, 10, 23, 10, 40, time.UTC).Unix() / 60, - interval: 7, // it should round to 6 - bucketStart: time.Date(2022, 1, 1, 10, 18, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 10, 24, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, - interval: 14, // it should round to 12 - bucketStart: time.Date(2022, 1, 1, 10, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 10, 12, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 1, 1, 10, 39, 10, 40, time.UTC).Unix() / 60, - interval: 59, // it should round to 30 - bucketStart: time.Date(2022, 1, 1, 10, 30, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 11, 0, 0, 0, time.UTC).Unix() / 60, - }, - { - reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, - interval: 63, // it should round to 60 - bucketStart: time.Date(2022, 1, 1, 10, 0, 0, 0, time.UTC).Unix() / 60, - bucketEnd: time.Date(2022, 1, 1, 11, 0, 0, 0, time.UTC).Unix() / 60, - }, - } - - for _, c := range cases { - bs, be := reportHandle.getAggregationBucketMinute(c.reportedAt, c.interval) - require.Equal(t, c.bucketStart, bs) - require.Equal(t, c.bucketEnd, be) - } - }) -} diff --git a/enterprise/reporting/util_test.go b/enterprise/reporting/util_test.go new file mode 100644 index 0000000000..2ca8437f9f --- /dev/null +++ b/enterprise/reporting/util_test.go @@ -0,0 +1,404 @@ +package reporting + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + mockEventSampler "github.com/rudderlabs/rudder-server/mocks/enterprise/reporting/event_sampler" + "github.com/rudderlabs/rudder-server/utils/types" +) + +func TestGetAggregationBucket(t *testing.T) { + t.Run("should return the correct aggregation bucket with default interval of 1 mintue", func(t *testing.T) { + cases := []struct { + reportedAt int64 + bucketStart int64 + bucketEnd int64 + }{ + { + reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 1, 1, 10, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 10, 6, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 2, 4, 11, 5, 59, 10, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 2, 4, 11, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 2, 4, 11, 6, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 3, 5, 12, 59, 59, 59, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 3, 5, 12, 59, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 13, 1, 0, 0, time.UTC).Unix() / 60, + }, + } + + for _, c := range cases { + bs, be := getAggregationBucketMinute(c.reportedAt, 1) + require.Equal(t, c.bucketStart, bs) + require.Equal(t, c.bucketEnd, be) + } + }) + + t.Run("should return the correct aggregation bucket with aggregation interval of 5 mintue", func(t *testing.T) { + cases := []struct { + reportedAt int64 + bucketStart int64 + bucketEnd int64 + }{ + { + reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 1, 1, 10, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 10, 10, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 2, 4, 11, 5, 59, 10, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 2, 4, 11, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 2, 4, 11, 10, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 7, 30, 11, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 13, 10, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 8, 50, 30, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 13, 10, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 9, 5, 15, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 13, 10, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 3, 5, 12, 55, 53, 1, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 3, 5, 12, 55, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 3, 5, 12, 57, 53, 1, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 3, 5, 12, 55, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 3, 5, 12, 59, 59, 59, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 3, 5, 12, 55, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 3, 5, 13, 0, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 13, 5, 0, 0, time.UTC).Unix() / 60, + }, + } + + for _, c := range cases { + bs, be := getAggregationBucketMinute(c.reportedAt, 5) + require.Equal(t, c.bucketStart, bs) + require.Equal(t, c.bucketEnd, be) + } + }) + + t.Run("should return the correct aggregation bucket with aggregation interval of 15 mintue", func(t *testing.T) { + cases := []struct { + reportedAt int64 + bucketStart int64 + bucketEnd int64 + }{ + { + reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 1, 1, 10, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 10, 15, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 2, 4, 11, 17, 59, 10, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 2, 4, 11, 15, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 2, 4, 11, 30, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 39, 10, 59, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 30, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 13, 45, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 4, 6, 13, 59, 50, 30, time.UTC).Unix() / 60, + bucketStart: time.Date(2022, 4, 6, 13, 45, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 4, 6, 14, 0, 0, 0, time.UTC).Unix() / 60, + }, + } + + for _, c := range cases { + bs, be := getAggregationBucketMinute(c.reportedAt, 15) + require.Equal(t, c.bucketStart, bs) + require.Equal(t, c.bucketEnd, be) + } + }) + + t.Run("should choose closest factor of 60 if interval is non positive and return the correct aggregation bucket", func(t *testing.T) { + cases := []struct { + reportedAt int64 + interval int64 + bucketStart int64 + bucketEnd int64 + }{ + { + reportedAt: time.Date(2022, 1, 1, 12, 5, 10, 40, time.UTC).Unix() / 60, + interval: -1, // it should round to 1 + bucketStart: time.Date(2022, 1, 1, 12, 5, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 12, 6, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 2, 29, 10, 0, 2, 59, time.UTC).Unix() / 60, + interval: -1, // it should round to 1 + bucketStart: time.Date(2022, 2, 29, 10, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 2, 29, 10, 1, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 2, 10, 0, 0, 0, 40, time.UTC).Unix() / 60, + interval: 0, // it should round to 1 + bucketStart: time.Date(2022, 2, 10, 0, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 2, 10, 0, 1, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 11, 27, 23, 59, 59, 40, time.UTC).Unix() / 60, + interval: 0, // it should round to 1 + bucketStart: time.Date(2022, 11, 27, 23, 59, 59, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 11, 28, 0, 0, 0, 0, time.UTC).Unix() / 60, + }, + } + + for _, c := range cases { + bs, be := getAggregationBucketMinute(c.reportedAt, c.interval) + require.Equal(t, c.bucketStart, bs) + require.Equal(t, c.bucketEnd, be) + } + }) + + t.Run("should choose closest factor of 60 if interval is not a factor of 60 and return the correct aggregation bucket", func(t *testing.T) { + cases := []struct { + reportedAt int64 + interval int64 + bucketStart int64 + bucketEnd int64 + }{ + { + reportedAt: time.Date(2022, 1, 1, 10, 23, 10, 40, time.UTC).Unix() / 60, + interval: 7, // it should round to 6 + bucketStart: time.Date(2022, 1, 1, 10, 18, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 10, 24, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, + interval: 14, // it should round to 12 + bucketStart: time.Date(2022, 1, 1, 10, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 10, 12, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 1, 1, 10, 39, 10, 40, time.UTC).Unix() / 60, + interval: 59, // it should round to 30 + bucketStart: time.Date(2022, 1, 1, 10, 30, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 11, 0, 0, 0, time.UTC).Unix() / 60, + }, + { + reportedAt: time.Date(2022, 1, 1, 10, 5, 10, 40, time.UTC).Unix() / 60, + interval: 63, // it should round to 60 + bucketStart: time.Date(2022, 1, 1, 10, 0, 0, 0, time.UTC).Unix() / 60, + bucketEnd: time.Date(2022, 1, 1, 11, 0, 0, 0, time.UTC).Unix() / 60, + }, + } + + for _, c := range cases { + bs, be := getAggregationBucketMinute(c.reportedAt, c.interval) + require.Equal(t, c.bucketStart, bs) + require.Equal(t, c.bucketEnd, be) + } + }) +} + +func TestTransformMetricWithEventSamplingWithNilEventSampler(t *testing.T) { + sampleEvent := []byte(`{"event": "1"}`) + sampleResponse := "response" + metric := types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + } + transformedMetric, err := transformMetricWithEventSampling(metric, 1234567890, nil, 60) + require.NoError(t, err) + require.Equal(t, sampleEvent, []byte(transformedMetric.StatusDetail.SampleEvent)) + require.Equal(t, sampleResponse, transformedMetric.StatusDetail.SampleResponse) +} + +func TestFloorFactor(t *testing.T) { + tests := []struct { + name string + intervalMs int64 + expected int64 + }{ + // Edge cases + {name: "Smaller than smallest factor", intervalMs: 0, expected: 1}, + {name: "Exact match for smallest factor", intervalMs: 1, expected: 1}, + {name: "Exact match for largest factor", intervalMs: 60, expected: 60}, + {name: "Larger than largest factor", intervalMs: 100, expected: 60}, + + // Typical cases + {name: "Between 10 and 12", intervalMs: 11, expected: 10}, + {name: "Between 4 and 6", intervalMs: 5, expected: 5}, + {name: "Between 20 and 30", intervalMs: 25, expected: 20}, + {name: "Exact match in the middle", intervalMs: 30, expected: 30}, + {name: "Exact match at a non-boundary point", intervalMs: 12, expected: 12}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := floorFactor(test.intervalMs) + require.Equal(t, test.expected, result) + }) + } +} + +func TestTransformMetricWithEventSampling(t *testing.T) { + sampleEvent := []byte(`{"event": "2"}`) + sampleResponse := "sample response" + emptySampleEvent := []byte(`{}`) + emptySampleResponse := "" + + tests := []struct { + name string + metric types.PUReportedMetric + wantMetric types.PUReportedMetric + shouldGet bool + getError error + found bool + shouldPut bool + putError error + }{ + { + name: "Nil sample event", + metric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: nil, + }, + }, + wantMetric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: nil, + }, + }, + }, + { + name: "Empty sample event", + metric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: emptySampleEvent, + }, + }, + wantMetric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: emptySampleEvent, + }, + }, + }, + { + name: "Event Sampler returns get error", + metric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + wantMetric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + shouldGet: true, + getError: errors.New("get error"), + }, + { + name: "Event Sampler returns put error", + metric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + wantMetric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + shouldGet: true, + shouldPut: true, + putError: errors.New("put error"), + }, + { + name: "Sample is not found", + metric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + wantMetric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + shouldGet: true, + shouldPut: true, + }, + { + name: "Sample is found", + metric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: sampleEvent, + SampleResponse: sampleResponse, + }, + }, + wantMetric: types.PUReportedMetric{ + StatusDetail: &types.StatusDetail{ + SampleEvent: emptySampleEvent, + SampleResponse: emptySampleResponse, + }, + }, + shouldGet: true, + found: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockEventSampler := mockEventSampler.NewMockEventSampler(ctrl) + if tt.shouldGet { + mockEventSampler.EXPECT().Get(gomock.Any()).Return(tt.found, tt.getError) + } + if tt.shouldPut { + mockEventSampler.EXPECT().Put(gomock.Any()).Return(tt.putError) + } + + gotMetric, err := transformMetricWithEventSampling(tt.metric, 1234567890, mockEventSampler, 60) + if tt.getError != nil || tt.putError != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.wantMetric.StatusDetail.SampleEvent, gotMetric.StatusDetail.SampleEvent) + require.Equal(t, tt.wantMetric.StatusDetail.SampleResponse, gotMetric.StatusDetail.SampleResponse) + }) + } +} diff --git a/enterprise/reporting/utils.go b/enterprise/reporting/utils.go new file mode 100644 index 0000000000..ab28b78b1a --- /dev/null +++ b/enterprise/reporting/utils.go @@ -0,0 +1,103 @@ +package reporting + +import ( + "encoding/json" + "sort" + "strings" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler" + "github.com/rudderlabs/rudder-server/utils/types" +) + +func floorFactor(intervalMs int64) int64 { + factors := []int64{1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60} + + // Find the smallest index where factors[i] >= intervalMs + index := sort.Search(len(factors), func(i int) bool { + return factors[i] >= intervalMs + }) + + // If index is 0, intervalMs is smaller than the smallest factor + if index == 0 { + return factors[0] + } + + // If factors[index] == intervalMs, return it directly + if index < len(factors) && factors[index] == intervalMs { + return factors[index] + } + + // Otherwise, return the previous factor + return factors[index-1] +} + +func getAggregationBucketMinute(timeMs, intervalMs int64) (int64, int64) { + // If interval is not a factor of 60, then the bucket start will not be aligned to hour start + // For example, if intervalMs is 7, and timeMs is 28891085 (6:05) then the bucket start will be 28891079 (5:59) + // and current bucket will contain the data of 2 different hourly buckets, which is should not have happened. + // To avoid this, we round the intervalMs to the nearest factor of 60. + if intervalMs <= 0 || 60%intervalMs != 0 { + intervalMs = floorFactor(intervalMs) + } + bucketStart := timeMs - (timeMs % intervalMs) + bucketEnd := bucketStart + intervalMs + return bucketStart, bucketEnd +} + +func transformMetricWithEventSampling(metric types.PUReportedMetric, reportedAt int64, eventSampler event_sampler.EventSampler, eventSamplingDuration int64) (types.PUReportedMetric, error) { + if eventSampler == nil { + return metric, nil + } + + isValidSample := (metric.StatusDetail.SampleEvent != nil && string(metric.StatusDetail.SampleEvent) != "{}") || metric.StatusDetail.SampleResponse != "" + + if isValidSample { + sampleEventBucket, _ := getAggregationBucketMinute(reportedAt, eventSamplingDuration) + hash := NewLabelSet(metric, sampleEventBucket).generateHash() + found, err := eventSampler.Get(hash) + if err != nil { + return metric, err + } + + if found { + metric.StatusDetail.SampleEvent = json.RawMessage(`{}`) + metric.StatusDetail.SampleResponse = "" + } else { + err := eventSampler.Put(hash) + if err != nil { + return metric, err + } + } + } + return metric, nil +} + +func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) types.PUReportedMetric { + for _, col := range piiColumns { + switch col { + case "sample_event": + metric.StatusDetail.SampleEvent = []byte(`{}`) + case "sample_response": + metric.StatusDetail.SampleResponse = "" + case "event_name": + metric.StatusDetail.EventName = "" + case "event_type": + metric.StatusDetail.EventType = "" + } + } + + return metric +} + +func isMetricPosted(status int) bool { + return status >= 200 && status < 300 +} + +func getPIIColumnsToExclude() []string { + piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", "sample_event,sample_response"), ",") + for i := range piiColumnsToExclude { + piiColumnsToExclude[i] = strings.Trim(piiColumnsToExclude[i], " ") + } + return piiColumnsToExclude +} diff --git a/mocks/enterprise/reporting/event_sampler/mock_event_sampler.go b/mocks/enterprise/reporting/event_sampler/mock_event_sampler.go new file mode 100644 index 0000000000..6a80231099 --- /dev/null +++ b/mocks/enterprise/reporting/event_sampler/mock_event_sampler.go @@ -0,0 +1,81 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler (interfaces: EventSampler) +// +// Generated by this command: +// +// mockgen -destination=../../../mocks/enterprise/reporting/event_sampler/mock_event_sampler.go -package=mocks github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler EventSampler +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockEventSampler is a mock of EventSampler interface. +type MockEventSampler struct { + ctrl *gomock.Controller + recorder *MockEventSamplerMockRecorder + isgomock struct{} +} + +// MockEventSamplerMockRecorder is the mock recorder for MockEventSampler. +type MockEventSamplerMockRecorder struct { + mock *MockEventSampler +} + +// NewMockEventSampler creates a new mock instance. +func NewMockEventSampler(ctrl *gomock.Controller) *MockEventSampler { + mock := &MockEventSampler{ctrl: ctrl} + mock.recorder = &MockEventSamplerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventSampler) EXPECT() *MockEventSamplerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockEventSampler) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockEventSamplerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEventSampler)(nil).Close)) +} + +// Get mocks base method. +func (m *MockEventSampler) Get(key string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", key) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockEventSamplerMockRecorder) Get(key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockEventSampler)(nil).Get), key) +} + +// Put mocks base method. +func (m *MockEventSampler) Put(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Put", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// Put indicates an expected call of Put. +func (mr *MockEventSamplerMockRecorder) Put(key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockEventSampler)(nil).Put), key) +} diff --git a/utils/types/reporting_types.go b/utils/types/reporting_types.go index f7334968bb..08b1a6bcc9 100644 --- a/utils/types/reporting_types.go +++ b/utils/types/reporting_types.go @@ -57,10 +57,16 @@ type StatusDetail struct { EventType string `json:"eventType"` ErrorType string `json:"errorType"` ViolationCount int64 `json:"violationCount"` + ErrorDetails ErrorDetails `json:"-"` StatTags map[string]string `json:"-"` FailedMessages []*FailedMessage `json:"-"` } +type ErrorDetails struct { + Code string + Message string +} + type FailedMessage struct { MessageID string `json:"messageId"` ReceivedAt time.Time `json:"receivedAt"`