Skip to content

Commit

Permalink
[JBPM-10088] Removing timers when rollback (#2365)
Browse files Browse the repository at this point in the history
* [JBPM-10088] Removing timers when rollback

* [JBPM-10088] Dealing with transactions in different way

* [JBPM-10088] Set to no transactional

* Revert "[JBPM-10088] Set to no transactional"

This reverts commit 61b735b.

* [JBPM-10088] Setting CMT property in environment

* [JBPM-10088] REmoving new transaction created while querying during
rollback

* [JBPM-10088] Do not create transaction for regular timer task

* [JBPM-10088] Sonar comments

* [JBPM-10088] Some test failed

* [JBPM-10088] Removing unneeded casting
  • Loading branch information
fjtirado committed Dec 1, 2023
1 parent 6eafd50 commit 6218b86
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.drools.core.time.TimerService;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.kie.internal.runtime.manager.RuntimeEnvironment;

/**
* Implementations of these interface are responsible for scheduled jobs in global manner,
Expand Down Expand Up @@ -87,4 +88,6 @@ default void invalidate(JobHandle jobHandle) {
default TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) {
return null;
}

default void setEnvironment(RuntimeEnvironment environment) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public DefaultRuntimeEnvironment(EntityManagerFactory emf, GlobalSchedulerServic
this.usePersistence = true;
this.userGroupCallback = UserDataServiceProvider.getUserGroupCallback();
this.userInfo = UserDataServiceProvider.getUserInfo();
if (globalSchedulerService != null) {
globalSchedulerService.setEnvironment(this);
}
}

public DefaultRuntimeEnvironment(EntityManagerFactory emf, boolean usePersistence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

Expand Down Expand Up @@ -73,7 +74,7 @@ public class EJBTimerScheduler {
private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

@Resource
protected javax.ejb.TimerService timerService;
protected TimerService timerService;

@Resource
protected SessionContext ctx;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void executeTimerJob(Timer timer) {
Thread.currentThread().interrupt();
}
try {
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
executeTimerJobInstance(timerJobInstance);
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
Expand Down Expand Up @@ -186,7 +187,12 @@ private interface Transaction<I> {

@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
try {
operation.doWork(item);
} catch (Exception transactionEx) {
ctx.setRollbackOnly();
throw transactionEx;
}
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,50 +28,42 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job;
import org.drools.core.time.JobContext;
import org.drools.core.time.JobHandle;
import org.drools.core.time.TimerService;
import org.drools.core.time.Trigger;
import org.drools.core.time.impl.TimerJobInstance;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerFactory;
import org.drools.persistence.jta.JtaTransactionManager;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.JobNameHelper;
import org.jbpm.process.core.timer.NamedJobContext;
import org.jbpm.process.core.timer.SchedulerServiceInterceptor;
import org.jbpm.process.core.timer.impl.DelegateSchedulerServiceInterceptor;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
import org.jbpm.runtime.manager.impl.SimpleRuntimeEnvironment;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.runtime.manager.impl.jpa.TimerMappingInfo;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.RuntimeEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class EjbSchedulerService implements GlobalSchedulerService {
private static final Logger logger = LoggerFactory.getLogger(EjbSchedulerService.class);

private static final Boolean TRANSACTIONAL = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.tx", "true"));

private AtomicLong idCounter = new AtomicLong();
private TimerService globalTimerService;
private GlobalTimerService globalTimerService;
private EJBTimerScheduler scheduler;

private SchedulerServiceInterceptor interceptor = new DelegateSchedulerServiceInterceptor(this);


@Override
public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
long id = idCounter.getAndIncrement();
String jobName = getJobName(ctx, id);
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, ((GlobalTimerService) globalTimerService).getTimerServiceId());
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, globalTimerService.getTimerServiceId());

TimerJobInstance jobInstance = null;
// check if given timer job is marked as new timer meaning it was never scheduled before,
Expand All @@ -90,7 +82,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
ctx,
trigger,
jobHandle,
(InternalSchedulerService) globalTimerService);
globalTimerService);

jobHandle.setTimerJobInstance((TimerJobInstance) jobInstance);
interceptor.internalSchedule(jobInstance);
Expand All @@ -101,10 +93,6 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
public boolean removeJob(JobHandle jobHandle) {
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid();
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid));
if (TRANSACTIONAL && ejbTimer == null) {
logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid);
return false;
}
boolean result = scheduler.removeJob(jobHandle, ejbTimer);
logger.debug("Remove job returned {}", result);
return result;
Expand All @@ -114,7 +102,6 @@ private TimerJobInstance getTimerJobInstance (String uuid) {
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(uuid)));
}


@Override
public TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) {
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(processInstanceId, timerId)));
Expand All @@ -125,10 +112,9 @@ private Timer getEjbTimer(TimerMappingInfo timerMappingInfo) {
if(timerMappingInfo == null || timerMappingInfo.getInfo() == null) {
return null;
}
byte[] data = timerMappingInfo.getInfo();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(data)).readObject()).getTimer();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(timerMappingInfo.getInfo())).readObject()).getTimer();
} catch (Exception e) {
logger.warn("wast not able to deserialize info field from timer info for uuid");
logger.warn("Problem retrieving timer for uuid {}", timerMappingInfo.getUuid(), e);
return null;
}
}
Expand All @@ -147,30 +133,17 @@ private TimerMappingInfo getTimerMappinInfo(long processInstanceId, long timerId
}

private TimerMappingInfo getTimerMappingInfo(Function<EntityManager, List<TimerMappingInfo>> func) {
InternalRuntimeManager manager = ((GlobalTimerService) globalTimerService).getRuntimeManager();
String pu = ((InternalRuntimeManager) manager).getDeploymentDescriptor().getPersistenceUnit();
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(pu);
EntityManager em = emf.createEntityManager();
JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager();
boolean txOwner = false;
EntityManager em = EntityManagerFactoryManager.get()
.getOrCreate(globalTimerService.getRuntimeManager()
.getDeploymentDescriptor().getPersistenceUnit())
.createEntityManager();
try {
if (tm != null && tm.getStatus() == TransactionManager.STATUS_ROLLEDBACK) {
txOwner = tm.begin();
}
List<TimerMappingInfo> info = func.apply(em);
if (!info.isEmpty()) {
return info.get(0);
} else {
return null;
}

return !info.isEmpty() ? info.get(0) : null;
} catch (Exception ex) {
logger.warn("Error getting mapping info ",ex);
return null;
} finally {
if (tm != null) {
tm.commit(txOwner);
}
em.close();
}
}
Expand Down Expand Up @@ -201,7 +174,7 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {

@Override
public void initScheduler(TimerService timerService) {
this.globalTimerService = timerService;
this.globalTimerService = (GlobalTimerService)timerService;
try {
this.scheduler = InitialContext.doLookup("java:module/EJBTimerScheduler");
} catch (NamingException e) {
Expand All @@ -212,18 +185,17 @@ public void initScheduler(TimerService timerService) {
@Override
public void shutdown() {
// managed by container - no op

}

@Override
public JobHandle buildJobHandleForContext(NamedJobContext ctx) {

return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), ((GlobalTimerService) globalTimerService).getTimerServiceId());
return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), globalTimerService.getTimerServiceId());
}

@Override
public boolean isTransactional() {
return TRANSACTIONAL;
return true;
}

@Override
Expand All @@ -238,16 +210,18 @@ public void setInterceptor(SchedulerServiceInterceptor interceptor) {

@Override
public boolean isValid(GlobalJobHandle jobHandle) {

return true;
return true;
}

protected String getJobName(JobContext ctx, long id) {
return JobNameHelper.getJobName(ctx, id);
}

private boolean isNewTimer(JobContext ctx) {
return ctx instanceof ProcessJobContext && ((ProcessJobContext) ctx).isNewTimer();
}


@Override
public
void setEnvironment(RuntimeEnvironment environment) {
if (environment instanceof SimpleRuntimeEnvironment) {
((SimpleRuntimeEnvironment)environment).addToEnvironment("IS_TIMER_CMT", true);
}
}
}

0 comments on commit 6218b86

Please sign in to comment.