diff --git a/CHANGELOG.md b/CHANGELOG.md index fa1cc7fc5e..d7ee5b9a15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [1.39.1](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.39.1) (2024-12-17) + + +### Bug Fixes + +* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([21f0d13](https://github.com/rudderlabs/rudder-server/commit/21f0d13ffb64ff82fc147b46ca72b8e9c672b0ea)) + ## [1.39.0](https://github.com/rudderlabs/rudder-server/compare/v1.38.0...v1.39.0) (2024-12-10) diff --git a/warehouse/router/router.go b/warehouse/router/router.go index 7c02155477..b2e66d249d 100644 --- a/warehouse/router/router.go +++ b/warehouse/router/router.go @@ -74,6 +74,8 @@ type Router struct { inProgressMap map[workerIdentifierMapKey][]jobID inProgressMapLock sync.RWMutex + processingMu sync.Mutex + scheduledTimesCache map[string][]int scheduledTimesCacheLock sync.RWMutex @@ -371,18 +373,22 @@ loop: continue } + r.processingMu.Lock() inProgressNamespaces := r.getInProgressNamespaces() r.logger.Debugf(`Current inProgress namespace identifiers for %s: %v`, r.destType, inProgressNamespaces) uploadJobsToProcess, err := r.uploadsToProcess(ctx, availableWorkers, inProgressNamespaces) if err != nil && ctx.Err() == nil { r.logger.Errorn("Error getting uploads to process", logger.NewErrorField(err)) + r.processingMu.Unlock() return err } - for _, uploadJob := range uploadJobsToProcess { r.setDestInProgress(uploadJob.warehouse, uploadJob.upload.ID) + } + r.processingMu.Unlock() + for _, uploadJob := range uploadJobsToProcess { workerName := r.workerIdentifier(uploadJob.warehouse) r.workerChannelMapLock.RLock() @@ -596,6 +602,9 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse return defaultUploadPriority, nil } + r.processingMu.Lock() + defer r.processingMu.Unlock() + // If it is present do nothing else delete it if _, inProgress := r.isUploadJobInProgress(warehouse, latestInfo.ID); !inProgress { if err := r.uploadRepo.DeleteWaiting(ctx, latestInfo.ID); err != nil {