Skip to content

Commit

Permalink
fix: Fix high memory usages on import (#2046)
Browse files Browse the repository at this point in the history
  • Loading branch information
JanCizmar committed Dec 27, 2023
1 parent 33d9a23 commit 0b3554a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import io.tolgee.util.logger
import jakarta.persistence.EntityManager
import org.hibernate.LockOptions
import org.springframework.beans.factory.InitializingBean
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.annotation.Lazy
import org.springframework.context.event.EventListener
import org.springframework.data.redis.core.StringRedisTemplate
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue

@Component
Expand All @@ -38,15 +38,20 @@ class BatchJobChunkExecutionQueue(
private val queue = ConcurrentLinkedQueue<ExecutionQueueItem>()
}

@EventListener(JobQueueItemsEvent::class)
@EventListener
fun onJobItemEvent(event: JobQueueItemsEvent) {
when (event.type) {
QueueEventType.ADD -> this.addItemsToLocalQueue(event.items)
QueueEventType.REMOVE -> queue.removeAll(event.items.toSet())
}
}

@Scheduled(fixedDelay = 60000, initialDelay = 0)
@EventListener
fun onApplicationReady(event: ApplicationReadyEvent) {
populateQueue()
}

@Scheduled(fixedDelay = 60000)
fun populateQueue() {
logger.debug("Running scheduled populate queue")
val data = entityManager.createQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ class BatchJobProjectLockingManager(
}

private fun getInitialJobId(projectId: Long): Long? {
val jobs = batchJobService.getAllIncompleteJobs(projectId)
val jobs = batchJobService.getAllIncompleteJobIds(projectId)
val unlockedChunkCounts = batchJobService
.getAllUnlockedChunksForJobs(jobs.map { it.id })
.groupBy { it.batchJob.id }.map { it.key to it.value.count() }.toMap()
return jobs.find { it.totalChunks != unlockedChunkCounts[it.id] }?.id
.getAllUnlockedChunksForJobs(jobs.map { it.jobId })
.groupBy { it.batchJobId }.map { it.key to it.value.count() }.toMap()
// we are looking for a job that has already started and preferably for a locked one
return jobs.find { it.totalChunks != unlockedChunkCounts[it.jobId] }?.jobId ?: jobs.firstOrNull()?.jobId
}

private fun getRedissonProjectLocks(): RMap<Long, Long> {
Expand Down
24 changes: 13 additions & 11 deletions backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package io.tolgee.batch

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.tolgee.batch.data.AllIncompleteJobsResult
import io.tolgee.batch.data.BatchJobDto
import io.tolgee.batch.data.BatchJobType
import io.tolgee.batch.data.JobUnlockedChunk
import io.tolgee.batch.events.OnBatchJobCreated
import io.tolgee.component.CurrentDateProvider
import io.tolgee.constants.Message
Expand Down Expand Up @@ -230,7 +232,7 @@ class BatchJobService(
projectId,
userAccountId = getUserAccountIdForCurrentJobView(projectId),
oneHourAgo = currentDateProvider.date.addMinutes(-60),
completedStatuses = BatchJobStatus.values().filter { it.completed }
completedStatuses = BatchJobStatus.entries.filter { it.completed }
)

val progresses = getProgresses(jobs)
Expand Down Expand Up @@ -292,14 +294,14 @@ class BatchJobService(
return BatchJobView(job, progress, errorMessage)
}

fun getAllUnlockedChunksForJobs(jobIds: Iterable<Long>): MutableList<BatchJobChunkExecution> {
fun getAllUnlockedChunksForJobs(jobIds: Iterable<Long>): List<JobUnlockedChunk> {
return entityManager.createQuery(
"""
select new io.tolgee.batch.data.JobUnlockedChunk(bjce.batchJob.id, bjce.id)
from BatchJobChunkExecution bjce
join fetch bjce.batchJob bk
where bjce.batchJob.id in :jobIds
""".trimIndent(),
BatchJobChunkExecution::class.java
""",
JobUnlockedChunk::class.java
)
.setParameter("jobIds", jobIds)
.setHint(
Expand Down Expand Up @@ -369,16 +371,16 @@ class BatchJobService(
.resultList
}

fun getAllIncompleteJobs(projectId: Long): List<BatchJob> {
fun getAllIncompleteJobIds(projectId: Long): List<AllIncompleteJobsResult> {
return entityManager.createQuery(
"""from BatchJob j
"""select new io.tolgee.batch.data.AllIncompleteJobsResult(j.id, j.totalChunks) from BatchJob j
where j.project.id = :projectId
and j.status not in :completedStatuses
""",
BatchJob::class.java
AllIncompleteJobsResult::class.java
)
.setParameter("projectId", projectId)
.setParameter("completedStatuses", BatchJobStatus.values().filter { it.completed })
.setParameter("completedStatuses", BatchJobStatus.entries.filter { it.completed })
.resultList
}

Expand All @@ -397,7 +399,7 @@ class BatchJobService(
fun getJobsCompletedBefore(lockedJobIds: Iterable<Long>, before: Date): List<BatchJob> =
batchJobRepository.getCompletedJobs(lockedJobIds, before)

fun getStuckJobs(jobIds: MutableSet<Long>): List<BatchJob> {
return batchJobRepository.getStuckJobs(jobIds, currentDateProvider.date.addMinutes(-2))
fun getStuckJobIds(jobIds: MutableSet<Long>): List<Long> {
return batchJobRepository.getStuckJobIds(jobIds, currentDateProvider.date.addMinutes(-2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class ScheduledJobCleaner(
}

private fun handleStuckJobs() {
batchJobService.getStuckJobs(batchJobStateProvider.getCachedJobIds()).forEach {
logger.warn("Removing stuck job state ${it.id} using scheduled task")
batchJobStateProvider.removeJobState(it.id)
batchJobService.getStuckJobIds(batchJobStateProvider.getCachedJobIds()).forEach {
logger.warn("Removing stuck job state it using scheduled task")
batchJobStateProvider.removeJobState(it)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.tolgee.batch.data

data class AllIncompleteJobsResult(
val jobId: Long,
val totalChunks: Int
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.tolgee.batch.data

data class JobUnlockedChunk(
val batchJobId: Long,
val batchJobChunkExecutionId: Long
)
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ interface BatchJobRepository : JpaRepository<BatchJob, Long> {

@Query(
"""
select j from BatchJob j
select j.id from BatchJob j
join BatchJobChunkExecution bjce on bjce.batchJob.id = j.id
where j.id in :jobIds
group by j.id
having max(bjce.updatedAt) < :before
"""
)
fun getStuckJobs(
fun getStuckJobIds(
jobIds: MutableSet<Long>,
before: Date,
): List<BatchJob>
): List<Long>

@Query(
"""
Expand Down

0 comments on commit 0b3554a

Please sign in to comment.