Skip to content

Commit

Permalink
chore: sync release v1.39.1 to main branch (#5377)
Browse files Browse the repository at this point in the history
  • Loading branch information
devops-github-rudderstack authored Dec 17, 2024
1 parent d0ce669 commit dd33fee
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [1.39.1](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.39.1) (2024-12-17)


### Bug Fixes

* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([21f0d13](https://github.com/rudderlabs/rudder-server/commit/21f0d13ffb64ff82fc147b46ca72b8e9c672b0ea))

## [1.39.0](https://github.com/rudderlabs/rudder-server/compare/v1.38.0...v1.39.0) (2024-12-10)


Expand Down
11 changes: 10 additions & 1 deletion warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Router struct {
inProgressMap map[workerIdentifierMapKey][]jobID
inProgressMapLock sync.RWMutex

processingMu sync.Mutex

scheduledTimesCache map[string][]int
scheduledTimesCacheLock sync.RWMutex

Expand Down Expand Up @@ -371,18 +373,22 @@ loop:
continue
}

r.processingMu.Lock()
inProgressNamespaces := r.getInProgressNamespaces()
r.logger.Debugf(`Current inProgress namespace identifiers for %s: %v`, r.destType, inProgressNamespaces)

uploadJobsToProcess, err := r.uploadsToProcess(ctx, availableWorkers, inProgressNamespaces)
if err != nil && ctx.Err() == nil {
r.logger.Errorn("Error getting uploads to process", logger.NewErrorField(err))
r.processingMu.Unlock()
return err
}

for _, uploadJob := range uploadJobsToProcess {
r.setDestInProgress(uploadJob.warehouse, uploadJob.upload.ID)
}
r.processingMu.Unlock()

for _, uploadJob := range uploadJobsToProcess {
workerName := r.workerIdentifier(uploadJob.warehouse)

r.workerChannelMapLock.RLock()
Expand Down Expand Up @@ -596,6 +602,9 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse
return defaultUploadPriority, nil
}

r.processingMu.Lock()
defer r.processingMu.Unlock()

// If it is present do nothing else delete it
if _, inProgress := r.isUploadJobInProgress(warehouse, latestInfo.ID); !inProgress {
if err := r.uploadRepo.DeleteWaiting(ctx, latestInfo.ID); err != nil {
Expand Down

0 comments on commit dd33fee

Please sign in to comment.