Skip to content

Commit

Permalink
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
Browse files Browse the repository at this point in the history
…t.httplb
  • Loading branch information
lvrach committed Jan 2, 2025
2 parents b78648b + 18f4bdf commit f4234fd
Show file tree
Hide file tree
Showing 33 changed files with 1,585 additions and 1,107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
if: ${{ failure() }}
run: echo 'Not formatted files. Ensure you have run `make fmt` and committed the files locally.'
- name: Validate OpenAPI definition
uses: char0n/swagger-editor-validate@v1
uses: char0n/apidom-validate@v1
with:
definition-file: gateway/openapi.yaml

Expand Down
33 changes: 33 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Changelog

## [1.39.3](https://github.com/rudderlabs/rudder-server/compare/v1.39.2...v1.39.3) (2024-12-23)


### Bug Fixes

* replay tracking plan bug ([#5389](https://github.com/rudderlabs/rudder-server/issues/5389)) ([74056f8](https://github.com/rudderlabs/rudder-server/commit/74056f8211b3ed5b4c3d0485753401371bc53751))

## [1.39.2](https://github.com/rudderlabs/rudder-server/compare/v1.39.1...v1.39.2) (2024-12-18)


### Miscellaneous

* add error msg in the logs when gw req fails ([#5369](https://github.com/rudderlabs/rudder-server/issues/5369)) ([a0ef0f1](https://github.com/rudderlabs/rudder-server/commit/a0ef0f15ca456fc459233bf70c324cc60cfca7f0))

## [1.39.1](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.39.1) (2024-12-17)


### Bug Fixes

* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([21f0d13](https://github.com/rudderlabs/rudder-server/commit/21f0d13ffb64ff82fc147b46ca72b8e9c672b0ea))

## [1.39.0](https://github.com/rudderlabs/rudder-server/compare/v1.38.0...v1.39.0) (2024-12-10)


Expand Down Expand Up @@ -61,6 +82,18 @@
* upgrade major version pglock ([#5292](https://github.com/rudderlabs/rudder-server/issues/5292)) ([1f575ec](https://github.com/rudderlabs/rudder-server/commit/1f575ec2b5eac32df856ccf2d55ee2d40a516d8e))
* webhook integration tests commandline ([#5286](https://github.com/rudderlabs/rudder-server/issues/5286)) ([1d3ac29](https://github.com/rudderlabs/rudder-server/commit/1d3ac296789c2c65753df4f65ff80d971f4c3a0b))

## [1.38.4](https://github.com/rudderlabs/rudder-server/compare/v1.38.3...v1.38.4) (2024-12-04)


### Bug Fixes

* disable vacuum at startup for reporting ([#5325](https://github.com/rudderlabs/rudder-server/issues/5325)) ([d8fb8be](https://github.com/rudderlabs/rudder-server/commit/d8fb8be7a5ace45b6198579283b4f3b77cf71fbd))


### Miscellaneous

* remove full vacuum at flusher startup ([#5332](https://github.com/rudderlabs/rudder-server/issues/5332)) ([65ab23c](https://github.com/rudderlabs/rudder-server/commit/65ab23c79933d880dfef3dd8ff6e4a150f26435d))

## [1.38.3](https://github.com/rudderlabs/rudder-server/compare/v1.38.2...v1.38.3) (2024-11-20)


Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# syntax=docker/dockerfile:1

# GO_VERSION is updated automatically to match go.mod, see Makefile
ARG GO_VERSION=1.23.3
ARG GO_VERSION=1.23.4
ARG ALPINE_VERSION=3.20
FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS builder
ARG VERSION
Expand Down
72 changes: 51 additions & 21 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -86,16 +87,14 @@ type ErrorDetailReporter struct {
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement
eventSamplingEnabled config.ValueLoader[bool]
eventSamplingDuration config.ValueLoader[time.Duration]
eventSampler event_sampler.EventSampler

stats stats.Stats
config *config.Config
}

type errorDetails struct {
ErrorCode string
ErrorMessage string
}

func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
Expand All @@ -111,11 +110,26 @@ func NewErrorDetailReporter(
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")
eventSamplingEnabled := conf.GetReloadableBoolVar(false, "Reporting.errorReporting.eventSampling.enabled")
eventSamplingDuration := conf.GetReloadableDurationVar(60, time.Minute, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(100000, 1, "Reporting.eventSampling.cardinality")

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

var eventSampler event_sampler.EventSampler

if eventSamplingEnabled.Load() {
var err error
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
if err != nil {
panic(err)
}
}

return &ErrorDetailReporter{
ctx: ctx,
cancel: cancel,
Expand All @@ -128,6 +142,10 @@ func NewErrorDetailReporter(
vacuumFull: conf.GetReloadableBoolVar(true, "Reporting.errorReporting.vacuumFull", "Reporting.vacuumFull"),
httpClient: netClient,

eventSamplingEnabled: eventSamplingEnabled,
eventSamplingDuration: eventSamplingDuration,
eventSampler: eventSampler,

namespace: config.GetKubeNamespace(),
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),
Expand Down Expand Up @@ -160,12 +178,14 @@ func (edr *ErrorDetailReporter) DatabaseSyncer(c types.SyncerConfig) types.Repor
if !edr.config.GetBool("Reporting.errorReporting.syncer.enabled", true) {
return func() {}
}
if _, err := dbHandle.ExecContext(
context.Background(),
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable)),
); err != nil {
edr.log.Errorn("error full vacuuming", logger.NewStringField("table", ErrorDetailReportsTable), obskit.Error(err))
panic(err)
if edr.config.GetBool("Reporting.errorReporting.vacuumAtStartup", false) {
if _, err := dbHandle.ExecContext(
context.Background(),
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable)),
); err != nil {
edr.log.Errorn("error full vacuuming", logger.NewStringField("table", ErrorDetailReportsTable), obskit.Error(err))
panic(err)
}
}

return func() {
Expand Down Expand Up @@ -198,7 +218,7 @@ func (edr *ErrorDetailReporter) GetSyncer(syncerKey string) *types.SyncSource {
return edr.syncers[syncerKey]
}

func shouldReport(metric *types.PUReportedMetric) bool {
func shouldReport(metric types.PUReportedMetric) bool {
switch {
case metric.StatusDetail.StatusCode >= http.StatusBadRequest, metric.StatusDetail.StatusCode == types.FilterEventCode, metric.StatusDetail.StatusCode == types.SuppressEventCode:
return true
Expand All @@ -222,12 +242,12 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR

reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
metric := *metric
if !shouldReport(metric) {
continue
}

workspaceID := edr.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)
metric := *metric
if edr.IsPIIReportingDisabled(workspaceID) {
edr.log.Debugn("PII setting is disabled for workspaceId:", obskit.WorkspaceID(workspaceID))
return nil
Expand All @@ -236,16 +256,23 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail))

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)
metric.StatusDetail.ErrorDetails = edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)

edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"errorCode": metric.StatusDetail.ErrorDetails.Code,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
"sourceId": metric.ConnectionDetails.SourceID,
"destinationId": metric.ConnectionDetails.DestinationID,
}).Count(int(metric.StatusDetail.Count))

if edr.eventSamplingEnabled.Load() {
metric, err = transformMetricWithEventSampling(metric, reportedAt, edr.eventSampler, int64(edr.eventSamplingDuration.Load().Minutes()))
if err != nil {
return err
}
}

_, err = stmt.Exec(
workspaceID,
edr.namespace,
Expand All @@ -260,8 +287,8 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
metric.StatusDetail.Count,
metric.StatusDetail.StatusCode,
metric.StatusDetail.EventType,
errDets.ErrorCode,
errDets.ErrorMessage,
metric.StatusDetail.ErrorDetails.Code,
metric.StatusDetail.ErrorDetails.Message,
metric.StatusDetail.SampleResponse,
string(metric.StatusDetail.SampleEvent),
metric.StatusDetail.EventName,
Expand Down Expand Up @@ -309,13 +336,13 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) {
return dbHandle, nil
}

func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) errorDetails {
func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) types.ErrorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse)
cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statTags)
return errorDetails{
ErrorMessage: cleanedErrMsg,
ErrorCode: errorCode,
return types.ErrorDetails{
Message: cleanedErrMsg,
Code: errorCode,
}
}

Expand Down Expand Up @@ -773,4 +800,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me
func (edr *ErrorDetailReporter) Stop() {
edr.cancel()
_ = edr.g.Wait()
if edr.eventSampler != nil {
edr.eventSampler.Close()
}
}
Loading

0 comments on commit f4234fd

Please sign in to comment.