Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reduce the error report sample events #5371

Merged
merged 16 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -87,16 +87,14 @@
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement
eventSamplingEnabled config.ValueLoader[bool]
eventSamplingDuration config.ValueLoader[time.Duration]
eventSampler event_sampler.EventSampler

stats stats.Stats
config *config.Config
}

type errorDetails struct {
ErrorCode string
ErrorMessage string
}

func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
Expand All @@ -112,11 +110,26 @@
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")
eventSamplingEnabled := conf.GetReloadableBoolVar(false, "Reporting.errorReporting.eventSampling.enabled")
eventSamplingDuration := conf.GetReloadableDurationVar(60, time.Minute, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(100000, 1, "Reporting.eventSampling.cardinality")

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

var eventSampler event_sampler.EventSampler

if eventSamplingEnabled.Load() {
var err error
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
if err != nil {
panic(err)

Check warning on line 129 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L126-L129

Added lines #L126 - L129 were not covered by tests
}
}

return &ErrorDetailReporter{
ctx: ctx,
cancel: cancel,
Expand All @@ -129,6 +142,10 @@
vacuumFull: conf.GetReloadableBoolVar(true, "Reporting.errorReporting.vacuumFull", "Reporting.vacuumFull"),
httpClient: netClient,

eventSamplingEnabled: eventSamplingEnabled,
eventSamplingDuration: eventSamplingDuration,
eventSampler: eventSampler,
koladilip marked this conversation as resolved.
Show resolved Hide resolved

namespace: config.GetKubeNamespace(),
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),
Expand Down Expand Up @@ -201,7 +218,7 @@
return edr.syncers[syncerKey]
}

func shouldReport(metric *types.PUReportedMetric) bool {
func shouldReport(metric types.PUReportedMetric) bool {
switch {
case metric.StatusDetail.StatusCode >= http.StatusBadRequest, metric.StatusDetail.StatusCode == types.FilterEventCode, metric.StatusDetail.StatusCode == types.SuppressEventCode:
return true
Expand All @@ -225,12 +242,12 @@

reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
metric := *metric
if !shouldReport(metric) {
continue
}

workspaceID := edr.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)
metric := *metric
if edr.IsPIIReportingDisabled(workspaceID) {
edr.log.Debugn("PII setting is disabled for workspaceId:", obskit.WorkspaceID(workspaceID))
return nil
Expand All @@ -239,16 +256,23 @@
edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail))

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)
metric.StatusDetail.ErrorDetails = edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)

edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"errorCode": metric.StatusDetail.ErrorDetails.Code,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
"sourceId": metric.ConnectionDetails.SourceID,
"destinationId": metric.ConnectionDetails.DestinationID,
}).Count(int(metric.StatusDetail.Count))

if edr.eventSamplingEnabled.Load() {
metric, err = transformMetricWithEventSampling(metric, reportedAt, edr.eventSampler, int64(edr.eventSamplingDuration.Load().Minutes()))
if err != nil {
return err
}

Check warning on line 273 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L270-L273

Added lines #L270 - L273 were not covered by tests
}

_, err = stmt.Exec(
workspaceID,
edr.namespace,
Expand All @@ -263,8 +287,8 @@
metric.StatusDetail.Count,
metric.StatusDetail.StatusCode,
metric.StatusDetail.EventType,
errDets.ErrorCode,
errDets.ErrorMessage,
metric.StatusDetail.ErrorDetails.Code,
metric.StatusDetail.ErrorDetails.Message,
metric.StatusDetail.SampleResponse,
string(metric.StatusDetail.SampleEvent),
metric.StatusDetail.EventName,
Expand Down Expand Up @@ -312,13 +336,13 @@
return dbHandle, nil
}

func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) errorDetails {
func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) types.ErrorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse)
cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statTags)
return errorDetails{
ErrorMessage: cleanedErrMsg,
ErrorCode: errorCode,
return types.ErrorDetails{
Message: cleanedErrMsg,
Code: errorCode,
}
}

Expand Down Expand Up @@ -776,4 +800,7 @@
func (edr *ErrorDetailReporter) Stop() {
edr.cancel()
_ = edr.g.Wait()
if edr.eventSampler != nil {
edr.eventSampler.Close()
}

Check warning on line 805 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L803-L805

Added lines #L803 - L805 were not covered by tests
}
Loading
Loading