Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBPM-10088] Removing timers when rollback #2365

Merged
merged 10 commits into from
Dec 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.drools.core.time.TimerService;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.kie.internal.runtime.manager.RuntimeEnvironment;

/**
* Implementations of these interface are responsible for scheduled jobs in global manner,
Expand Down Expand Up @@ -87,4 +88,6 @@ default void invalidate(JobHandle jobHandle) {
default TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) {
return null;
}

default void setEnvironment(RuntimeEnvironment environment) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public DefaultRuntimeEnvironment(EntityManagerFactory emf, GlobalSchedulerServic
this.usePersistence = true;
this.userGroupCallback = UserDataServiceProvider.getUserGroupCallback();
this.userInfo = UserDataServiceProvider.getUserInfo();
if (globalSchedulerService != null) {
globalSchedulerService.setEnvironment(this);
}
}

public DefaultRuntimeEnvironment(EntityManagerFactory emf, boolean usePersistence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

Expand Down Expand Up @@ -73,7 +74,7 @@ public class EJBTimerScheduler {
private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

@Resource
protected javax.ejb.TimerService timerService;
protected TimerService timerService;

@Resource
protected SessionContext ctx;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void executeTimerJob(Timer timer) {
Thread.currentThread().interrupt();
}
try {
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
executeTimerJobInstance(timerJobInstance);
Copy link
Contributor Author

@fjtirado fjtirado Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to create a new transaction here. Also PersistableRunner is on the middle, handling the transaction itself. Because that transaction is not longer usable once PersistableRunner commits it, we need to create a new one to execute recoverTimerJob.

Copy link
Member

@gmunozfe gmunozfe Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this was provoking an exception at setRollbackOnly call in the new end-2-end test, because there was no transaction

} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
Expand Down Expand Up @@ -186,7 +187,12 @@ private interface Transaction<I> {

@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transaction should be rollback if there is any exception

operation.doWork(item);
} catch (Exception transactionEx) {
ctx.setRollbackOnly();
throw transactionEx;
}
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,42 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job;
import org.drools.core.time.JobContext;
import org.drools.core.time.JobHandle;
import org.drools.core.time.TimerService;
import org.drools.core.time.Trigger;
import org.drools.core.time.impl.TimerJobInstance;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerFactory;
import org.drools.persistence.jta.JtaTransactionManager;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.JobNameHelper;
import org.jbpm.process.core.timer.NamedJobContext;
import org.jbpm.process.core.timer.SchedulerServiceInterceptor;
import org.jbpm.process.core.timer.impl.DelegateSchedulerServiceInterceptor;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.jbpm.runtime.manager.impl.SimpleRuntimeEnvironment;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.runtime.manager.impl.jpa.TimerMappingInfo;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.RuntimeEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class EjbSchedulerService implements GlobalSchedulerService {
private static final Logger logger = LoggerFactory.getLogger(EjbSchedulerService.class);

private static final Boolean TRANSACTIONAL = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.tx", "true"));

private AtomicLong idCounter = new AtomicLong();
private TimerService globalTimerService;
private GlobalTimerService globalTimerService;
private EJBTimerScheduler scheduler;

private SchedulerServiceInterceptor interceptor = new DelegateSchedulerServiceInterceptor(this);


@Override
public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
long id = idCounter.getAndIncrement();
String jobName = getJobName(ctx, id);
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, ((GlobalTimerService) globalTimerService).getTimerServiceId());
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, globalTimerService.getTimerServiceId());

TimerJobInstance jobInstance = null;
// check if given timer job is marked as new timer meaning it was never scheduled before,
Expand All @@ -89,7 +82,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
ctx,
trigger,
jobHandle,
(InternalSchedulerService) globalTimerService);
globalTimerService);

jobHandle.setTimerJobInstance((TimerJobInstance) jobInstance);
interceptor.internalSchedule(jobInstance);
Expand All @@ -100,10 +93,6 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
public boolean removeJob(JobHandle jobHandle) {
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid();
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid));
if (TRANSACTIONAL && ejbTimer == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timer might be expired (getEJBTimer will return null) and we still want try to remove it just in case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid);
return false;
}
boolean result = scheduler.removeJob(jobHandle, ejbTimer);
logger.debug("Remove job returned {}", result);
return result;
Expand All @@ -113,7 +102,6 @@ private TimerJobInstance getTimerJobInstance (String uuid) {
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(uuid)));
}


@Override
public TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) {
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(processInstanceId, timerId)));
Expand All @@ -124,10 +112,9 @@ private Timer getEjbTimer(TimerMappingInfo timerMappingInfo) {
if(timerMappingInfo == null || timerMappingInfo.getInfo() == null) {
return null;
}
byte[] data = timerMappingInfo.getInfo();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(data)).readObject()).getTimer();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(timerMappingInfo.getInfo())).readObject()).getTimer();
} catch (Exception e) {
logger.warn("wast not able to deserialize info field from timer info for uuid");
logger.warn("Problem retrieving timer for uuid {}", timerMappingInfo.getUuid(), e);
return null;
}
}
Expand All @@ -146,30 +133,17 @@ private TimerMappingInfo getTimerMappinInfo(long processInstanceId, long timerId
}

private TimerMappingInfo getTimerMappingInfo(Function<EntityManager, List<TimerMappingInfo>> func) {
InternalRuntimeManager manager = ((GlobalTimerService) globalTimerService).getRuntimeManager();
Copy link
Contributor Author

@fjtirado fjtirado Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using CMT, I do not think we need to create a different transaction here anymore

String pu = ((InternalRuntimeManager) manager).getDeploymentDescriptor().getPersistenceUnit();
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(pu);
EntityManager em = emf.createEntityManager();
JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager();
boolean txOwner = false;
EntityManager em = EntityManagerFactoryManager.get()
.getOrCreate(globalTimerService.getRuntimeManager()
.getDeploymentDescriptor().getPersistenceUnit())
.createEntityManager();
try {
if (tm != null && tm.getStatus() == TransactionManager.STATUS_ROLLEDBACK) {
txOwner = tm.begin();
}
List<TimerMappingInfo> info = func.apply(em);
if (!info.isEmpty()) {
return info.get(0);
} else {
return null;
}

return !info.isEmpty() ? info.get(0) : null;
} catch (Exception ex) {
logger.warn("Error getting mapping info ",ex);
return null;
} finally {
if (tm != null) {
tm.commit(txOwner);
}
em.close();
}
}
Expand Down Expand Up @@ -200,7 +174,7 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {

@Override
public void initScheduler(TimerService timerService) {
this.globalTimerService = timerService;
this.globalTimerService = (GlobalTimerService)timerService;
try {
this.scheduler = InitialContext.doLookup("java:module/EJBTimerScheduler");
} catch (NamingException e) {
Expand All @@ -211,18 +185,17 @@ public void initScheduler(TimerService timerService) {
@Override
public void shutdown() {
// managed by container - no op

}

@Override
public JobHandle buildJobHandleForContext(NamedJobContext ctx) {

return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), ((GlobalTimerService) globalTimerService).getTimerServiceId());
return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), globalTimerService.getTimerServiceId());
}

@Override
public boolean isTransactional() {
return TRANSACTIONAL;
return true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not point in keeping the variable. CMT is always transactional.

}

@Override
Expand All @@ -237,11 +210,18 @@ public void setInterceptor(SchedulerServiceInterceptor interceptor) {

@Override
public boolean isValid(GlobalJobHandle jobHandle) {

return true;
return true;
}

protected String getJobName(JobContext ctx, long id) {
return JobNameHelper.getJobName(ctx, id);
}

@Override
public
void setEnvironment(RuntimeEnvironment environment) {
if (environment instanceof SimpleRuntimeEnvironment) {
((SimpleRuntimeEnvironment)environment).addToEnvironment("IS_TIMER_CMT", true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to tell GlobalJpaTimerJobInsance that we are using CMT

}
}
}
Loading