Skip to content

Commit

Permalink
[JBPM-10209] A bit of refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Nov 14, 2023
1 parent 0b419a1 commit e497bc9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class EJBTimerScheduler {
private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3"));

private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000"));

private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200"));

private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false"));

Expand Down Expand Up @@ -92,30 +94,24 @@ public void executeTimerJob(Timer timer) {
EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo();
TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance();
logger.debug("About to execute timer for job {}", timerJob);

String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId();

// handle overdue timers as ejb timer service might start before all deployments are ready
long time = 0;
while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
time += 500;

if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
try {
while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
Thread.sleep(OVERDUE_CHECK_TIME);
time += OVERDUE_CHECK_TIME;
if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
}
}
}
} catch (InterruptedException e) {
logger.warn("Thread has been interrupted", e);
Thread.currentThread().interrupt();
}
try {
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
} catch (SessionNotFoundException e) {
logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e);
removeUnrecoverableTimer(timerJob, timer);
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
Expand All @@ -125,75 +121,51 @@ private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws E
((Callable<?>) timerJobInstance).call();
}

private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
try {
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance());
}
};
invokeTransaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1);
}
}

private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) {
if (isSessionNotFound(e)) {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e);
private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) {
Transaction<TimerJobInstance> tx;
if (isSessionNotFound(cause)) {
// if session is not found means the process has already finished. In this case we just need to remove
// the timer and avoid any recovery as it should not trigger any more timers.
removeUnrecoverableTimer(ejbTimerJob, timer);
return;
tx = timerJobInstance -> {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance);
}
};
}

if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
// this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost
// because of the transaction, so we need to do this here.
try {

logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (this.removeJob(timerJobInstance.getJobHandle(), null)) {
this.internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance());
}
};
invokeTransaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1);
}
return;
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
}
};
}
else {
// if there is not next date to be fired, we need to apply policy otherwise will be lost
tx = timerJobInstance -> {
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
} else {
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
}
};
}

// if there is not next date to be fired, we need to apply policy otherwise will be lost

logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> operation = (instance) -> {
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = null;
if(ejbTimerJob instanceof EjbTimerJobRetry) {
info = ((EjbTimerJobRetry) ejbTimerJob).next();
} else {
info = new EjbTimerJobRetry(instance);
}
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT);
return;
}
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
TimerHandle handler = newTimer.getHandle();
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler);
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer));
};
try {
invokeTransaction (operation, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1);
invokeTransaction (tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e) {
logger.error("Failed to executed timer recovery", e);
}

}

private boolean isSessionNotFound(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.drools.core.time.impl.TimerJobInstance;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerFactory;
import org.drools.persistence.api.TransactionSynchronization;
import org.drools.persistence.jta.JtaTransactionManager;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.JobNameHelper;
Expand Down

0 comments on commit e497bc9

Please sign in to comment.