Skip to content

Commit

Permalink
fix: processing pickup race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Dec 16, 2024
1 parent aef8585 commit 25b87a6
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Router struct {
inProgressMap map[workerIdentifierMapKey][]jobID
inProgressMapLock sync.RWMutex

processingMu sync.Mutex

scheduledTimesCache map[string][]int
scheduledTimesCacheLock sync.RWMutex

Expand Down Expand Up @@ -371,6 +373,7 @@ loop:
continue
}

r.processingMu.Lock()
inProgressNamespaces := r.getInProgressNamespaces()
r.logger.Debugf(`Current inProgress namespace identifiers for %s: %v`, r.destType, inProgressNamespaces)

Expand All @@ -379,10 +382,12 @@ loop:
r.logger.Errorn("Error getting uploads to process", logger.NewErrorField(err))
return err
}

for _, uploadJob := range uploadJobsToProcess {
r.setDestInProgress(uploadJob.warehouse, uploadJob.upload.ID)
}
r.processingMu.Unlock()

for _, uploadJob := range uploadJobsToProcess {
workerName := r.workerIdentifier(uploadJob.warehouse)

r.workerChannelMapLock.RLock()
Expand Down Expand Up @@ -596,6 +601,10 @@ func (r *Router) handlePriorityForWaitingUploads(ctx context.Context, warehouse
return defaultUploadPriority, nil
}

// Taking a processing lock in so that in progress namespace should not be deleted
r.processingMu.Lock()
defer r.processingMu.Unlock()

// If it is present do nothing else delete it
if _, inProgress := r.isUploadJobInProgress(warehouse, latestInfo.ID); !inProgress {
if err := r.uploadRepo.DeleteWaiting(ctx, latestInfo.ID); err != nil {
Expand Down

0 comments on commit 25b87a6

Please sign in to comment.