Skip to content

Commit

Permalink
Merge branch 'master' into feat.fetchRemoteSchemaAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Dec 23, 2024
2 parents e3d2f48 + a82aa47 commit 63cb186
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
if: ${{ failure() }}
run: echo 'Not formatted files. Ensure you have run `make fmt` and committed the files locally.'
- name: Validate OpenAPI definition
uses: char0n/swagger-editor-validate@v1
uses: char0n/apidom-validate@v1
with:
definition-file: gateway/openapi.yaml

Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## [1.39.2](https://github.com/rudderlabs/rudder-server/compare/v1.39.1...v1.39.2) (2024-12-18)


### Miscellaneous

* add error msg in the logs when gw req fails ([#5369](https://github.com/rudderlabs/rudder-server/issues/5369)) ([a0ef0f1](https://github.com/rudderlabs/rudder-server/commit/a0ef0f15ca456fc459233bf70c324cc60cfca7f0))

## [1.39.1](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.39.1) (2024-12-17)


### Bug Fixes

* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([21f0d13](https://github.com/rudderlabs/rudder-server/commit/21f0d13ffb64ff82fc147b46ca72b8e9c672b0ea))

## [1.39.0](https://github.com/rudderlabs/rudder-server/compare/v1.38.0...v1.39.0) (2024-12-10)


Expand Down Expand Up @@ -61,6 +75,18 @@
* upgrade major version pglock ([#5292](https://github.com/rudderlabs/rudder-server/issues/5292)) ([1f575ec](https://github.com/rudderlabs/rudder-server/commit/1f575ec2b5eac32df856ccf2d55ee2d40a516d8e))
* webhook integration tests commandline ([#5286](https://github.com/rudderlabs/rudder-server/issues/5286)) ([1d3ac29](https://github.com/rudderlabs/rudder-server/commit/1d3ac296789c2c65753df4f65ff80d971f4c3a0b))

## [1.38.4](https://github.com/rudderlabs/rudder-server/compare/v1.38.3...v1.38.4) (2024-12-04)


### Bug Fixes

* disable vacuum at startup for reporting ([#5325](https://github.com/rudderlabs/rudder-server/issues/5325)) ([d8fb8be](https://github.com/rudderlabs/rudder-server/commit/d8fb8be7a5ace45b6198579283b4f3b77cf71fbd))


### Miscellaneous

* remove full vacuum at flusher startup ([#5332](https://github.com/rudderlabs/rudder-server/issues/5332)) ([65ab23c](https://github.com/rudderlabs/rudder-server/commit/65ab23c79933d880dfef3dd8ff6e4a150f26435d))

## [1.38.3](https://github.com/rudderlabs/rudder-server/compare/v1.38.2...v1.38.3) (2024-11-20)


Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# syntax=docker/dockerfile:1

# GO_VERSION is updated automatically to match go.mod, see Makefile
ARG GO_VERSION=1.23.3
ARG GO_VERSION=1.23.4
ARG ALPINE_VERSION=3.20
FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS builder
ARG VERSION
Expand Down
15 changes: 9 additions & 6 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand Down Expand Up @@ -160,12 +161,14 @@ func (edr *ErrorDetailReporter) DatabaseSyncer(c types.SyncerConfig) types.Repor
if !edr.config.GetBool("Reporting.errorReporting.syncer.enabled", true) {
return func() {}
}
if _, err := dbHandle.ExecContext(
context.Background(),
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable)),
); err != nil {
edr.log.Errorn("error full vacuuming", logger.NewStringField("table", ErrorDetailReportsTable), obskit.Error(err))
panic(err)
if edr.config.GetBool("Reporting.errorReporting.vacuumAtStartup", false) {
if _, err := dbHandle.ExecContext(
context.Background(),
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable)),
); err != nil {
edr.log.Errorn("error full vacuuming", logger.NewStringField("table", ErrorDetailReportsTable), obskit.Error(err))
panic(err)
}
}

return func() {
Expand Down
6 changes: 0 additions & 6 deletions enterprise/reporting/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,6 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C

f.initCommonTags()
f.initStats(f.commonTags)
if _, err := db.Exec(
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(table)),
); err != nil {
log.Errorn("error full vacuuming", logger.NewStringField("table", table), obskit.Error(err))
return nil, fmt.Errorf("error full vacuuming %s table %w", table, err)
}
return &f, nil
}

Expand Down
8 changes: 5 additions & 3 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ func (r *DefaultReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSy
if !config.GetBool("Reporting.syncer.enabled", true) {
return func() {}
}
if _, err := dbHandle.ExecContext(context.Background(), `vacuum full analyze reports;`); err != nil {
r.log.Errorn(`[ Reporting ]: Error full vacuuming reports table`, logger.NewErrorField(err))
panic(err)
if config.GetBool("Reporting.syncer.vacuumAtStartup", false) {
if _, err := dbHandle.ExecContext(context.Background(), `vacuum full analyze reports;`); err != nil {
r.log.Errorn(`[ Reporting ]: Error full vacuuming reports table`, logger.NewErrorField(err))
panic(err)
}
}
return func() {
r.g.Go(func() error {
Expand Down
31 changes: 28 additions & 3 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,11 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
errorMessage = err.Error()
status = response.GetErrorStatusCode(errorMessage)
responseBody = response.GetStatus(errorMessage)
gw.logger.Infon("response",
gw.logger.Errorn("response",
logger.NewStringField("ip", kithttputil.GetRequestIP(r)),
logger.NewStringField("path", r.URL.Path),
logger.NewIntField("status", int64(status)),
logger.NewStringField("body", responseBody),
obskit.Error(err),
)
gw.logger.Debugn("response",
logger.NewStringField("ip", kithttputil.GetRequestIP(r)),
Expand Down Expand Up @@ -763,13 +763,16 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
if err != nil {
stat.RequestFailed(response.InvalidJSON)
stat.Report(gw.stats)
gw.logger.Errorn("invalid json in request",
obskit.Error(err))
return nil, errors.New(response.InvalidJSON)
}
gw.requestSizeStat.Observe(float64(len(body)))

if len(messages) == 0 {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
gw.logger.Errorn("no messages in request")
return nil, errors.New(response.NotRudderEvent)
}

Expand All @@ -779,7 +782,10 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
stat := gwstats.SourceStat{ReqType: reqType}
err := gw.streamMsgValidator(&msg)
if err != nil {
gw.logger.Errorn("invalid message in request", logger.NewErrorField(err))
loggerFields := msg.Properties.LoggerFields()
loggerFields = append(loggerFields, obskit.Error(err))
gw.logger.Errorn("invalid message in request",
loggerFields...)
stat.RequestEventsFailed(1, response.InvalidStreamMessage)
stat.Report(gw.stats)
return nil, errors.New(response.InvalidStreamMessage)
Expand All @@ -793,6 +799,9 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
if err != nil {
stat.RequestEventsFailed(1, response.NotRudderEvent)
stat.Report(gw.stats)
loggerFields := msg.Properties.LoggerFields()
loggerFields = append(loggerFields, obskit.Error(err))
gw.logger.Errorn("failed to set type in message", loggerFields...)
return nil, errors.New(response.NotRudderEvent)
}
}
Expand All @@ -806,19 +815,27 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
if err != nil {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
gw.logger.Errorn("failed to set messageID in message",
obskit.Error(err))
return nil, errors.New(response.NotRudderEvent)
}
}
rudderId, err := getRudderId(userIDFromReq, anonIDFromReq)
if err != nil {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
gw.logger.Errorn("failed to get rudderId",
obskit.Error(err))
return nil, errors.New(response.NotRudderEvent)
}
msg.Payload, err = sjson.SetBytes(msg.Payload, "rudderId", rudderId.String())
if err != nil {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
loggerFields := msg.Properties.LoggerFields()
loggerFields = append(loggerFields, obskit.Error(err))
gw.logger.Errorn("failed to set rudderId in message",
loggerFields...)
return nil, errors.New(response.NotRudderEvent)
}
writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID)
Expand Down Expand Up @@ -877,13 +894,17 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
err = fmt.Errorf("filling receivedAt: %w", err)
stat.RequestEventsFailed(1, err.Error())
stat.Report(gw.stats)
gw.logger.Errorn("failed to fill receivedAt in message",
obskit.Error(err))
return nil, fmt.Errorf("filling receivedAt: %w", err)
}
msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP)
if err != nil {
err = fmt.Errorf("filling request_ip: %w", err)
stat.RequestEventsFailed(1, err.Error())
stat.Report(gw.stats)
gw.logger.Errorn("failed to fill request_ip in message",
obskit.Error(err))
return nil, fmt.Errorf("filling request_ip: %w", err)
}

Expand All @@ -899,6 +920,10 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
err = fmt.Errorf("marshalling event batch: %w", err)
stat.RequestEventsFailed(1, err.Error())
stat.Report(gw.stats)
loggerFields := msg.Properties.LoggerFields()
loggerFields = append(loggerFields, obskit.Error(err))
gw.logger.Errorn("failed to marshal event batch",
loggerFields...)
return nil, fmt.Errorf("marshalling event batch: %w", err)
}
jobUUID := uuid.New()
Expand Down
24 changes: 12 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/rudderlabs/rudder-server

go 1.23.3
go 1.23.4

// Addressing snyk vulnerabilities in indirect dependencies
// When upgrading a dependency, please make sure that
Expand Down Expand Up @@ -42,7 +42,7 @@ require (
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.5.0
github.com/docker/docker v27.4.0+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/chi/v5 v5.2.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/gocql/gocql v1.14.4
Expand All @@ -62,7 +62,7 @@ require (
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.13.0
github.com/marcboeker/go-duckdb v1.8.3
github.com/minio/minio-go/v7 v7.0.81
github.com/minio/minio-go/v7 v7.0.82
github.com/mitchellh/mapstructure v1.5.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo/v2 v2.22.0
Expand All @@ -78,10 +78,10 @@ require (
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.45.0
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.5.3
github.com/rudderlabs/rudder-schemas v0.5.4
github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a
github.com/rudderlabs/sql-tunnels v0.1.7
github.com/rudderlabs/sqlconnect-go v1.13.0
github.com/rudderlabs/sqlconnect-go v1.14.0
github.com/samber/lo v1.47.0
github.com/segmentio/go-hll v1.0.1
github.com/segmentio/kafka-go v0.4.47
Expand All @@ -107,10 +107,10 @@ require (
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.10.0
google.golang.org/api v0.211.0
google.golang.org/api v0.212.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583
google.golang.org/grpc v1.68.1
google.golang.org/protobuf v1.35.2
google.golang.org/protobuf v1.36.0
)

require (
Expand All @@ -124,9 +124,9 @@ require (
require (
cel.dev/expr v0.16.1 // indirect
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.12.1 // indirect
cloud.google.com/go/auth v0.13.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
cloud.google.com/go/iam v1.2.2 // indirect
cloud.google.com/go/monitoring v1.21.2 // indirect
dario.cat/mergo v1.0.1 // indirect
Expand Down Expand Up @@ -214,7 +214,7 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/go-playground/validator/v10 v10.23.0 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
Expand Down Expand Up @@ -338,11 +338,11 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0
golang.org/x/tools v0.26.0 // indirect
Expand Down
Loading

0 comments on commit 63cb186

Please sign in to comment.