Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reduce the error report sample events #5371

Merged
merged 16 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -87,16 +87,14 @@ type ErrorDetailReporter struct {
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement
eventSamplingEnabled config.ValueLoader[bool]
eventSamplingDuration config.ValueLoader[time.Duration]
eventSampler event_sampler.EventSampler

stats stats.Stats
config *config.Config
}

type errorDetails struct {
ErrorCode string
ErrorMessage string
}

func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
Expand All @@ -112,11 +110,26 @@ func NewErrorDetailReporter(
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")
eventSamplingEnabled := conf.GetReloadableBoolVar(false, "Reporting.errorReporting.eventSampling.enabled")
eventSamplingDuration := conf.GetReloadableDurationVar(60, time.Minute, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(100000, 1, "Reporting.eventSampling.cardinality")

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

var eventSampler event_sampler.EventSampler

if eventSamplingEnabled.Load() {
var err error
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
if err != nil {
panic(err)
}
}

return &ErrorDetailReporter{
ctx: ctx,
cancel: cancel,
Expand All @@ -129,6 +142,10 @@ func NewErrorDetailReporter(
vacuumFull: conf.GetReloadableBoolVar(true, "Reporting.errorReporting.vacuumFull", "Reporting.vacuumFull"),
httpClient: netClient,

eventSamplingEnabled: eventSamplingEnabled,
eventSamplingDuration: eventSamplingDuration,
eventSampler: eventSampler,
koladilip marked this conversation as resolved.
Show resolved Hide resolved

namespace: config.GetKubeNamespace(),
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),
Expand Down Expand Up @@ -201,7 +218,7 @@ func (edr *ErrorDetailReporter) GetSyncer(syncerKey string) *types.SyncSource {
return edr.syncers[syncerKey]
}

func shouldReport(metric *types.PUReportedMetric) bool {
func shouldReport(metric types.PUReportedMetric) bool {
switch {
case metric.StatusDetail.StatusCode >= http.StatusBadRequest, metric.StatusDetail.StatusCode == types.FilterEventCode, metric.StatusDetail.StatusCode == types.SuppressEventCode:
return true
Expand All @@ -225,12 +242,12 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR

reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
metric := *metric
if !shouldReport(metric) {
continue
}

workspaceID := edr.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)
metric := *metric
if edr.IsPIIReportingDisabled(workspaceID) {
edr.log.Debugn("PII setting is disabled for workspaceId:", obskit.WorkspaceID(workspaceID))
return nil
Expand All @@ -239,16 +256,23 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail))

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)
metric.StatusDetail.ErrorDetails = edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)

edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"errorCode": metric.StatusDetail.ErrorDetails.Code,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
"sourceId": metric.ConnectionDetails.SourceID,
"destinationId": metric.ConnectionDetails.DestinationID,
}).Count(int(metric.StatusDetail.Count))

if edr.eventSamplingEnabled.Load() {
metric, err = transformMetricWithEventSampling(metric, reportedAt, edr.eventSampler, int64(edr.eventSamplingDuration.Load().Minutes()))
if err != nil {
return err
}
}

_, err = stmt.Exec(
workspaceID,
edr.namespace,
Expand All @@ -263,8 +287,8 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
metric.StatusDetail.Count,
metric.StatusDetail.StatusCode,
metric.StatusDetail.EventType,
errDets.ErrorCode,
errDets.ErrorMessage,
metric.StatusDetail.ErrorDetails.Code,
metric.StatusDetail.ErrorDetails.Message,
metric.StatusDetail.SampleResponse,
string(metric.StatusDetail.SampleEvent),
metric.StatusDetail.EventName,
Expand Down Expand Up @@ -312,13 +336,13 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) {
return dbHandle, nil
}

func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) errorDetails {
func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) types.ErrorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse)
cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statTags)
return errorDetails{
ErrorMessage: cleanedErrMsg,
ErrorCode: errorCode,
return types.ErrorDetails{
Message: cleanedErrMsg,
Code: errorCode,
}
}

Expand Down Expand Up @@ -776,4 +800,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me
func (edr *ErrorDetailReporter) Stop() {
edr.cancel()
_ = edr.g.Wait()
if edr.eventSampler != nil {
edr.eventSampler.Close()
}
}
Loading
Loading