Skip to content

Commit

Permalink
fix: Fix slow cancellation of batch operation (#2047)
Browse files Browse the repository at this point in the history
  • Loading branch information
JanCizmar authored Dec 27, 2023
1 parent 9054dbe commit 58a43ca
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.mock.mockito.SpyBean
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
Expand Down Expand Up @@ -127,7 +126,7 @@ class BatchJobManagementControllerTest : ProjectAuthControllerTest("/v2/projects
while (true) {
val context = it.arguments[2] as CoroutineContext
context.ensureActive()
Thread.sleep(100)
Thread.sleep(10)
}
}
it.callRealMethod()
Expand All @@ -143,7 +142,7 @@ class BatchJobManagementControllerTest : ProjectAuthControllerTest("/v2/projects
)
).andIsOk

Thread.sleep(2000)
Thread.sleep(500)

val job = getSingleJob()
performProjectAuthPut("batch-jobs/${job.id}/cancel")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ class BatchJobCancellationManager(
}
logger.debug("""Job $jobId cancellation committed. setting transaction committed for the executions.""")
executions.forEach {
progressManager.handleChunkCompletedCommitted(it)
progressManager.handleChunkCompletedCommitted(it, false)
}

progressManager.onJobCompletedCommitted(jobId)
}

fun cancelExecution(execution: BatchJobChunkExecution) {
Expand Down
23 changes: 13 additions & 10 deletions backend/data/src/main/kotlin/io/tolgee/batch/ProgressManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ProgressManager(
)
}

fun handleChunkCompletedCommitted(execution: BatchJobChunkExecution) {
fun handleChunkCompletedCommitted(execution: BatchJobChunkExecution, triggerJobCompleted: Boolean = true) {
val state = batchJobStateProvider.updateState(execution.batchJob.id) {
logger.debug("Setting transaction committed for chunk execution ${execution.id} to true")
it.compute(execution.id) { _, v ->
Expand All @@ -118,18 +118,21 @@ class ProgressManager(
"(current execution: ${execution.id}), " +
"incomplete executions: $incompleteExecutions"
}
if (isJobCompleted) {
onJobCompletedCommitted(execution)
if (isJobCompleted && triggerJobCompleted) {
onJobCompletedCommitted(execution.batchJob.id)
}
}

private fun onJobCompletedCommitted(execution: BatchJobChunkExecution) {
batchJobStateProvider.removeJobState(execution.batchJob.id)
val jobDto = batchJobService.getJobDto(execution.batchJob.id)
eventPublisher.publishEvent(OnBatchJobStatusUpdated(jobDto.id, jobDto.projectId, jobDto.status))
cachingBatchJobService.evictJobCache(execution.batchJob.id)
batchJobProjectLockingManager.unlockJobForProject(jobDto.projectId, jobDto.id)
batchJobStateProvider.removeJobState(execution.batchJob.id)
fun onJobCompletedCommitted(jobId: Long) {
val jobDto = batchJobService.getJobDto(jobId)
onJobCompletedCommitted(jobDto)
}

fun onJobCompletedCommitted(batchJob: BatchJobDto) {
eventPublisher.publishEvent(OnBatchJobStatusUpdated(batchJob.id, batchJob.projectId, batchJob.status))
cachingBatchJobService.evictJobCache(batchJob.id)
batchJobProjectLockingManager.unlockJobForProject(batchJob.projectId, batchJob.id)
batchJobStateProvider.removeJobState(batchJob.id)
}

fun handleJobStatus(
Expand Down

0 comments on commit 58a43ca

Please sign in to comment.