Skip to content

Commit

Permalink
fix: Batch operations improvements
Browse files Browse the repository at this point in the history
- Rate limits for Tolgee Translator
- Job canelation fixed
- dynamic operation target
- split multilanguage machine translation
  • Loading branch information
JanCizmar committed Jul 29, 2023
1 parent 74de3f0 commit 2896866
Show file tree
Hide file tree
Showing 39 changed files with 526 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ When no languages provided, it translates only untranslated languages."""
key = key,
languageTags = languages?.toList(),
useTranslationMemory = useTranslationMemory ?: false,
useMachineTranslation = useMachineTranslation ?: false
useMachineTranslation = useMachineTranslation ?: false,
isBatch = true
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ abstract class AbstractCacheTest : AbstractSpringTest() {
keyName = "key-name",
sourceLanguageTag = "en",
targetLanguageTag = "de",
serviceType = MtServiceType.GOOGLE
serviceType = MtServiceType.GOOGLE,
isBatch = false
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import io.tolgee.batch.events.OnBatchJobFailed
import io.tolgee.batch.events.OnBatchJobSucceeded
import io.tolgee.batch.state.BatchJobStateProvider
import io.tolgee.fixtures.waitFor
import io.tolgee.util.Logging
import io.tolgee.util.logger
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
import javax.persistence.EntityManager
Expand All @@ -15,7 +17,7 @@ class BatchJobActivityFinalizer(
private val entityManager: EntityManager,
private val activityHolder: ActivityHolder,
private val batchJobStateProvider: BatchJobStateProvider,
) {
) : Logging {
@EventListener(OnBatchJobSucceeded::class)
fun finalizeActivityWhenJobSucceeded(event: OnBatchJobSucceeded) {
finalizeActivityWhenJobCompleted(event.job)
Expand Down Expand Up @@ -53,6 +55,7 @@ class BatchJobActivityFinalizer(
waitFor(20000) {
val committedChunks = batchJobStateProvider.get(job.id).values
.count { !it.retry && it.transactionCommitted && it.status.completed }
logger.debug("Waitinng for completed chunks ($committedChunks) to be equal to all other chunks count (${job.totalChunks - 1})")
committedChunks == job.totalChunks - 1
}
}
Expand Down Expand Up @@ -126,12 +129,13 @@ class BatchJobActivityFinalizer(
group by entity_class, entity_id
having count(*) > 1)
and
activity_revision_id not in (select min(activity_revision_id)
from activity_describing_entity
where activity_revision_id in (:revisionIds)
or activity_revision_id = :activityRevisionIdToMergeInto
group by entity_class, entity_id
having count(*) > 1)
(activity_revision_id, entity_class, entity_id) not in (
select min(activity_revision_id), entity_class, entity_id
from activity_describing_entity
where activity_revision_id in (:revisionIds)
or activity_revision_id = :activityRevisionIdToMergeInto
group by entity_class, entity_id
having count(*) > 1)
""".trimIndent()
)
.setParameter("activityRevisionIdToMergeInto", activityRevisionIdToMergeInto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BatchJobCancellationManager(
}

fun cancelJob(jobId: Long) {
executeInNewTransaction(
val executions = executeInNewTransaction(
transactionManager = transactionManager,
isolationLevel = TransactionDefinition.ISOLATION_DEFAULT
) {
Expand All @@ -71,9 +71,13 @@ class BatchJobCancellationManager(
executions.forEach { execution ->
execution.status = BatchJobChunkExecutionStatus.CANCELLED
entityManager.persist(execution)
progressManager.handleProgress(execution)
}

executions.forEach { progressManager.handleProgress(it) }
executions
}
executions.forEach {
progressManager.handleChunkCompletedCommitted(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class BatchJobConcurrentLauncher(

private fun logItemsPulled(items: List<ExecutionQueueItem>) {
if (items.isNotEmpty()) {
logger.debug(
logger.trace(
"Pulled ${items.size} items from queue: " +
items.joinToString(", ") { it.chunkExecutionId.toString() }
)
logger.debug(
logger.trace(
"${batchJobChunkExecutionQueue.size} is left in the queue " +
"(${System.identityHashCode(batchJobChunkExecutionQueue)}): " +
batchJobChunkExecutionQueue.joinToString(", ") { it.chunkExecutionId.toString() }
Expand All @@ -130,7 +130,7 @@ class BatchJobConcurrentLauncher(
processExecution: (executionItem: ExecutionQueueItem, coroutineContext: CoroutineContext) -> Unit
) {
if (!executionItem.isTimeToExecute()) {
logger.debug(
logger.trace(
"""Execution ${executionItem.chunkExecutionId} not ready to execute, adding back to queue:
| Difference ${executionItem.executeAfter!! - currentDateProvider.date.time}""".trimMargin()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class BatchJobDto(
override var id: Long,
val projectId: Long,
val authorId: Long?,
val target: List<Long>,
val target: List<Any>,
val totalItems: Int,
val totalChunks: Int,
val chunkSize: Int,
Expand Down
11 changes: 5 additions & 6 deletions backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ class BatchJobService(
) : Logging {

@Transactional
fun <RequestType> startJob(
request: RequestType,
fun startJob(
request: Any,
project: Project,
author: UserAccount?,
type: BatchJobType
): BatchJob {
var executions: List<BatchJobChunkExecution>? = null
val job = executeInNewTransaction(transactionManager) {
val processor = getProcessor<RequestType, Any>(type)
val processor = getProcessor(type)
val target = processor.getTarget(request)

val job = BatchJob().apply {
Expand Down Expand Up @@ -187,9 +187,8 @@ class BatchJobService(
return BatchJobView(job, progress, errorMessage)
}

@Suppress("USELESS_CAST")
fun <RequestType, ParamsType> getProcessor(type: BatchJobType): ChunkProcessor<RequestType, ParamsType> =
applicationContext.getBean(type.processor.java) as ChunkProcessor<RequestType, ParamsType>
fun getProcessor(type: BatchJobType): ChunkProcessor<Any, Any, Any> =
applicationContext.getBean(type.processor.java) as ChunkProcessor<Any, Any, Any>

fun deleteAllByProjectId(projectId: Long) {
val batchJobs = getAllByProjectId(projectId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ enum class BatchJobType(
*/
val chunkSize: Int,
val maxRetries: Int,
val processor: KClass<out ChunkProcessor<*, *>>,
val processor: KClass<out ChunkProcessor<*, *, *>>,
val defaultRetryWaitTimeInMs: Int = 2000,
) {
PRE_TRANSLATE_BY_MT(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package io.tolgee.batch

data class BatchTranslationTargetItem(val keyId: Long, val languageId: Long)
61 changes: 51 additions & 10 deletions backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessingUtil.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package io.tolgee.batch

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.sentry.Sentry
import io.tolgee.activity.ActivityHolder
import io.tolgee.component.CurrentDateProvider
import io.tolgee.component.machineTranslation.TranslationApiRateLimitException
import io.tolgee.exceptions.ExceptionWithMessage
import io.tolgee.exceptions.OutOfCreditsException
import io.tolgee.model.batch.BatchJobChunkExecution
import io.tolgee.model.batch.BatchJobChunkExecutionStatus
import io.tolgee.util.Logging
import io.tolgee.util.logger
import org.apache.commons.lang3.exception.ExceptionUtils
import org.hibernate.LockOptions
import org.springframework.context.ApplicationContext
import java.util.*
Expand All @@ -24,7 +28,6 @@ open class ChunkProcessingUtil(
open fun processChunk() {
val time = measureTimeMillis {
try {
val processor = batchJobService.getProcessor<Any, Any>(job.type)
processor.process(job, toProcess, coroutineContext) {
if (it != toProcess.size) {
progressManager.publishSingleChunkProgress(job.id, it)
Expand Down Expand Up @@ -62,8 +65,7 @@ open class ChunkProcessingUtil(
execution.status = BatchJobChunkExecutionStatus.FAILED
execution.errorMessage = (exception as? ExceptionWithMessage)?.tolgeeMessage

Sentry.captureException(exception)
logger.error(exception.message, exception)
logException(exception)

if (exception is ChunkFailedException) {
successfulTargets = exception.successfulTargets
Expand All @@ -77,6 +79,18 @@ open class ChunkProcessingUtil(
retryFailedExecution(exception)
}

private fun logException(exception: Throwable) {
val knownCauses = listOf(
OutOfCreditsException::class.java, TranslationApiRateLimitException::class.java
)

val isKnownCause = knownCauses.any { ExceptionUtils.indexOfType(exception, it) > -1 }
if (!isKnownCause) {
Sentry.captureException(exception)
logger.error(exception.message, exception)
}
}

private fun retryFailedExecution(exception: Throwable) {
var maxRetries = job.type.maxRetries
var waitTime = job.type.defaultRetryWaitTimeInMs
Expand All @@ -86,13 +100,13 @@ open class ChunkProcessingUtil(
waitTime = getWaitTime(exception)
}

if (retries >= maxRetries) {
logger.debug("Max retries reached for job execution $execution")
if (retries >= maxRetries && maxRetries != -1) {
logger.debug("Max retries reached for job execution ${execution.id}")
Sentry.captureException(exception)
return
}

logger.debug("Retrying job execution $execution in ${waitTime}ms")
logger.debug("Retrying job execution ${execution.id} in ${waitTime}ms")
retryExecution.executeAfter = Date(waitTime + currentDateProvider.date.time)
execution.retry = true
}
Expand Down Expand Up @@ -122,17 +136,44 @@ open class ChunkProcessingUtil(
applicationContext.getBean(ProgressManager::class.java)
}

private var successfulTargets: List<Long>? = null
private val processor by lazy {
batchJobService.getProcessor(job.type)
}

private var successfulTargets: List<Any>? = null

private val toProcess by lazy {
val chunked = job.chunkedTarget
val chunk = chunked[execution.chunkNumber]
val previousSuccessfulTargets = previousExecutions.flatMap { it.successTargets }.toSet()
val toProcess = chunk.toMutableSet()
toProcess.removeAll(previousSuccessfulTargets)
toProcess.toList()
}

private val previousSuccessfulTargets by lazy {
previousExecutions.flatMap {
// this is important!!
// we want the equals check to be run on the correct type with correct class instances
convertChunkToItsType(it.successTargets)
}.toSet()
}

/**
* We need to convert the chunk to the right type, so we pass it to the processor correctly
*
* e.g. It can happen that the chunk is converted to a list of integers for caching, but
* we actually need a list of Long
*/
private val chunk by lazy {
val chunked = job.chunkedTarget
val chunk = chunked[execution.chunkNumber]
convertChunkToItsType(chunk)
}

private fun convertChunkToItsType(chunk: List<Any>): List<Any> {
val type =
jacksonObjectMapper().typeFactory.constructCollectionType(List::class.java, processor.getTargetItemType())
return jacksonObjectMapper().convertValue(chunk, type) as List<Any>
}

val retryExecution: BatchJobChunkExecution by lazy {
BatchJobChunkExecution().apply {
batchJob = entityManager.getReference(execution.batchJob::class.java, job.id)
Expand Down
14 changes: 11 additions & 3 deletions backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@ package io.tolgee.batch
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import kotlin.coroutines.CoroutineContext

interface ChunkProcessor<RequestType, ParamsType> {
fun process(job: BatchJobDto, chunk: List<Long>, coroutineContext: CoroutineContext, onProgress: ((Int) -> Unit))
fun getTarget(data: RequestType): List<Long>
interface ChunkProcessor<RequestType, ParamsType, TargetItemType> {
fun process(
job: BatchJobDto,
chunk: List<TargetItemType>,
coroutineContext: CoroutineContext,
onProgress: ((Int) -> Unit)
)

fun getTarget(data: RequestType): List<TargetItemType>
fun getParams(data: RequestType): ParamsType

fun getParams(job: BatchJobDto): ParamsType {
return jacksonObjectMapper().convertValue(job.params, getParamsType())
}

fun getParamsType(): Class<ParamsType>?

fun getTargetItemType(): Class<TargetItemType>
}
6 changes: 3 additions & 3 deletions backend/data/src/main/kotlin/io/tolgee/batch/exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import io.tolgee.exceptions.ExceptionWithMessage

open class ChunkFailedException(
message: Message,
val successfulTargets: List<Long>,
val successfulTargets: List<Any>,
override val cause: Throwable
) :
ExceptionWithMessage(message)

open class FailedDontRequeueException(
message: Message,
successfulTargets: List<Long>,
successfulTargets: List<Any>,
cause: Throwable
) : ChunkFailedException(message, successfulTargets, cause)

open class RequeueWithDelayException(
message: Message,
successfulTargets: List<Long>,
successfulTargets: List<Any>,
cause: Throwable,
val delayInMs: Int = 100,
val increaseFactor: Int = 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.coroutines.CoroutineContext
class ClearTranslationsChunkProcessor(
private val translationService: TranslationService,
private val entityManager: EntityManager
) : ChunkProcessor<ClearTranslationsRequest, ClearTranslationsJobParams> {
) : ChunkProcessor<ClearTranslationsRequest, ClearTranslationsJobParams, Long> {
override fun process(
job: BatchJobDto,
chunk: List<Long>,
Expand All @@ -26,6 +26,7 @@ class ClearTranslationsChunkProcessor(
val params = getParams(job)
subChunked.forEach { subChunk ->
coroutineContext.ensureActive()
@Suppress("UNCHECKED_CAST")
translationService.clear(subChunk, params.languageIds)
entityManager.flush()
progress += subChunk.size
Expand All @@ -37,6 +38,10 @@ class ClearTranslationsChunkProcessor(
return ClearTranslationsJobParams::class.java
}

override fun getTargetItemType(): Class<Long> {
return Long::class.java
}

override fun getTarget(data: ClearTranslationsRequest): List<Long> {
return data.keyIds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.coroutines.CoroutineContext
class CopyTranslationsChunkProcessor(
private val translationService: TranslationService,
private val entityManager: EntityManager
) : ChunkProcessor<CopyTranslationRequest, CopyTranslationJobParams> {
) : ChunkProcessor<CopyTranslationRequest, CopyTranslationJobParams, Long> {
override fun process(
job: BatchJobDto,
chunk: List<Long>,
Expand All @@ -41,6 +41,10 @@ class CopyTranslationsChunkProcessor(
return data.keyIds
}

override fun getTargetItemType(): Class<Long> {
return Long::class.java
}

override fun getParams(data: CopyTranslationRequest): CopyTranslationJobParams {
return CopyTranslationJobParams().apply {
sourceLanguageId = data.sourceLanguageId
Expand Down
Loading

0 comments on commit 2896866

Please sign in to comment.