Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reduce the error report sample events #5371

Merged
merged 16 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -86,16 +87,14 @@
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement
eventSamplingEnabled config.ValueLoader[bool]
eventSamplingDuration config.ValueLoader[time.Duration]
eventSampler event_sampler.EventSampler

stats stats.Stats
config *config.Config
}

type errorDetails struct {
ErrorCode string
ErrorMessage string
}

func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
Expand All @@ -111,11 +110,26 @@
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")
eventSamplingEnabled := conf.GetReloadableBoolVar(false, "Reporting.errorReporting.eventSampling.enabled")
eventSamplingDuration := conf.GetReloadableDurationVar(60, time.Minute, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(100000, 1, "Reporting.eventSampling.cardinality")

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

var eventSampler event_sampler.EventSampler

if eventSamplingEnabled.Load() {
var err error
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
if err != nil {
panic(err)

Check warning on line 129 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L126-L129

Added lines #L126 - L129 were not covered by tests
}
}

return &ErrorDetailReporter{
ctx: ctx,
cancel: cancel,
Expand All @@ -128,6 +142,10 @@
vacuumFull: conf.GetReloadableBoolVar(true, "Reporting.errorReporting.vacuumFull", "Reporting.vacuumFull"),
httpClient: netClient,

eventSamplingEnabled: eventSamplingEnabled,
eventSamplingDuration: eventSamplingDuration,
eventSampler: eventSampler,
koladilip marked this conversation as resolved.
Show resolved Hide resolved

namespace: config.GetKubeNamespace(),
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),
Expand Down Expand Up @@ -198,7 +216,7 @@
return edr.syncers[syncerKey]
}

func shouldReport(metric *types.PUReportedMetric) bool {
func shouldReport(metric types.PUReportedMetric) bool {
switch {
case metric.StatusDetail.StatusCode >= http.StatusBadRequest, metric.StatusDetail.StatusCode == types.FilterEventCode, metric.StatusDetail.StatusCode == types.SuppressEventCode:
return true
Expand All @@ -222,12 +240,12 @@

reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
metric := *metric
if !shouldReport(metric) {
continue
}

workspaceID := edr.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)
metric := *metric
if edr.IsPIIReportingDisabled(workspaceID) {
edr.log.Debugn("PII setting is disabled for workspaceId:", obskit.WorkspaceID(workspaceID))
return nil
Expand All @@ -236,16 +254,23 @@
edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail))

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)
metric.StatusDetail.ErrorDetails = edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)

edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"errorCode": metric.StatusDetail.ErrorDetails.Code,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
"sourceId": metric.ConnectionDetails.SourceID,
"destinationId": metric.ConnectionDetails.DestinationID,
}).Count(int(metric.StatusDetail.Count))

if edr.eventSamplingEnabled.Load() {
metric, err = transformMetricWithEventSampling(metric, reportedAt, edr.eventSampler, int64(edr.eventSamplingDuration.Load().Minutes()))
if err != nil {
return err
}

Check warning on line 271 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L268-L271

Added lines #L268 - L271 were not covered by tests
}

_, err = stmt.Exec(
workspaceID,
edr.namespace,
Expand All @@ -260,8 +285,8 @@
metric.StatusDetail.Count,
metric.StatusDetail.StatusCode,
metric.StatusDetail.EventType,
errDets.ErrorCode,
errDets.ErrorMessage,
metric.StatusDetail.ErrorDetails.Code,
metric.StatusDetail.ErrorDetails.Message,
metric.StatusDetail.SampleResponse,
string(metric.StatusDetail.SampleEvent),
metric.StatusDetail.EventName,
Expand Down Expand Up @@ -309,13 +334,13 @@
return dbHandle, nil
}

func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) errorDetails {
func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) types.ErrorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse)
cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statTags)
return errorDetails{
ErrorMessage: cleanedErrMsg,
ErrorCode: errorCode,
return types.ErrorDetails{
Message: cleanedErrMsg,
Code: errorCode,
}
}

Expand Down
Loading
Loading