diff --git a/jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java b/jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java new file mode 100644 index 0000000000..2196fc5502 --- /dev/null +++ b/jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java @@ -0,0 +1,24 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jbpm.process.core; + +import org.kie.internal.runtime.manager.Disposable; +import org.kie.internal.runtime.manager.InternalRuntimeEngine; + +public interface DisposableRuntimeEngine extends InternalRuntimeEngine, Disposable { + boolean isDisposed(); +} diff --git a/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java b/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java index 2daa6b2cfd..2c013940f3 100644 --- a/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java +++ b/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java @@ -37,6 +37,7 @@ import org.drools.core.time.impl.DefaultJobHandle; import org.drools.core.time.impl.TimerJobFactoryManager; import org.drools.core.time.impl.TimerJobInstance; +import org.jbpm.process.core.DisposableRuntimeEngine; import org.jbpm.process.core.timer.GlobalSchedulerService; import org.jbpm.process.core.timer.NamedJobContext; import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext; @@ -420,7 +421,10 @@ public Environment getEnvironment() { return runtime.getKieSession().getEnvironment(); } - + + public boolean isDisposed() { + return runtime instanceof DisposableRuntimeEngine && ((DisposableRuntimeEngine)runtime).isDisposed(); + } } public List getListeners() { diff --git a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/JpaProcessPersistenceContext.java b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/JpaProcessPersistenceContext.java index d9d74a90ae..8d85d02d9f 100644 --- a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/JpaProcessPersistenceContext.java +++ b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/JpaProcessPersistenceContext.java @@ -41,6 +41,7 @@ public class JpaProcessPersistenceContext extends JpaPersistenceContext implements ProcessPersistenceContext { + public JpaProcessPersistenceContext(EntityManager em, TransactionManager txm) { super( em, txm ); @@ -66,7 +67,10 @@ public PersistentProcessInstance findProcessInstanceInfo(Long processId) { if( this.pessimisticLocking ) { return em.find( ProcessInstanceInfo.class, processId, lockMode ); } - return em.find( ProcessInstanceInfo.class, processId ); + logger.trace("Reading process instance info {} with em {}", processId, em); + ProcessInstanceInfo pi = em.find(ProcessInstanceInfo.class, processId); + logger.trace("Process instance info read {}", pi); + return pi; } public void remove(PersistentProcessInstance processInstanceInfo) { diff --git a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/ProcessInstanceInfo.java b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/ProcessInstanceInfo.java index 723e4e672c..00696764e4 100644 --- a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/ProcessInstanceInfo.java +++ b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/ProcessInstanceInfo.java @@ -55,11 +55,14 @@ import org.jbpm.workflow.instance.impl.WorkflowProcessInstanceImpl; import org.kie.api.runtime.Environment; import org.kie.api.runtime.process.ProcessInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Entity @SequenceGenerator(name="processInstanceInfoIdSeq", sequenceName="PROCESS_INSTANCE_INFO_ID_SEQ") public class ProcessInstanceInfo implements PersistentProcessInstance { + private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceInfo.class); @Id @GeneratedValue(strategy = GenerationType.AUTO, generator="processInstanceInfoIdSeq") @Column(name = "InstanceId") @@ -192,9 +195,10 @@ public ProcessInstance getProcessInstance(InternalKnowledgeRuntime kruntime, } context.close(); } catch ( IOException e ) { - e.printStackTrace(); - throw new IllegalArgumentException( "IOException while loading process instance: " + e.getMessage(), - e ); + throw new IllegalArgumentException( "IOException while loading process instance: " + e.getMessage(), e); + } catch (RuntimeException e) { + logger.error("Error unmarshalling process instance info {}", processInstanceId, e); + throw e; } } ((WorkflowProcessInstanceImpl) processInstance).internalSetStartDate(this.startDate); diff --git a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java index c19a959172..16b7de1cfb 100644 --- a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java +++ b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java @@ -65,7 +65,7 @@ public GlobalJpaTimerJobInstance(Job job, JobContext ctx, Trigger trigger, @Override public Void call() throws Exception { AsyncExecutionMarker.markAsync(); - ExecutableRunner runner = null; + ExecutableRunner runner = null; TransactionManager jtaTm = null; boolean success = false; try { @@ -88,16 +88,16 @@ public Void call() throws Exception { success = true; return null; } catch( Exception e ) { - e.printStackTrace(); + logger.error("Exception executing timer", e); success = false; throw e; } finally { AsyncExecutionMarker.reset(); - if (runner != null && runner instanceof DisposableCommandService) { - if (allowedToDispose(((DisposableCommandService) runner).getEnvironment())) { - logger.debug("Allowed to dispose command service from global timer job instance"); - ((DisposableCommandService) runner).dispose(); - } + if (runner != null && runner instanceof DisposableCommandService) { + if (allowedToDispose(((DisposableCommandService) runner))) { + logger.debug("Allowed to dispose command service from global timer job instance"); + ((DisposableCommandService) runner).dispose(); + } } closeTansactionIfNeeded(jtaTm, success); } @@ -123,7 +123,11 @@ public String toString() { return "GlobalJpaTimerJobInstance [timerServiceId=" + timerServiceId + ", getJobHandle()=" + getJobHandle() + "]"; } - + + private boolean allowedToDispose(DisposableCommandService disposableCommandService) { + return !disposableCommandService.isDisposed() && allowedToDispose (disposableCommandService.getEnvironment()); + } + protected boolean allowedToDispose(Environment environment) { if (hasEnvironmentEntry(environment, "IS_JTA_TRANSACTION", false)) { return true; diff --git a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/AbstractRuntimeManager.java b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/AbstractRuntimeManager.java index 75bfd2631e..09b487f164 100644 --- a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/AbstractRuntimeManager.java +++ b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/AbstractRuntimeManager.java @@ -83,6 +83,7 @@ import org.kie.internal.runtime.manager.RuntimeManagerRegistry; import org.kie.internal.runtime.manager.SecurityManager; import org.kie.internal.runtime.manager.SessionFactory; +import org.kie.internal.runtime.manager.SessionNotFoundException; import org.kie.internal.runtime.manager.TaskServiceFactory; import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext; import org.kie.internal.runtime.manager.deploy.DeploymentDescriptorManager; @@ -389,8 +390,8 @@ protected boolean canDispose(RuntimeEngine runtime) { && tm.getStatus() != TransactionManager.STATUS_COMMITTED) { return false; } - } catch (Exception e) { - logger.warn("Exception dealing with transaction", e); + } catch (SessionNotFoundException e) { + logger.warn("Session not found exception", e); } return true; } diff --git a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/PerProcessInstanceRuntimeManager.java b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/PerProcessInstanceRuntimeManager.java index 19253e418e..dd46edd7c1 100644 --- a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/PerProcessInstanceRuntimeManager.java +++ b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/PerProcessInstanceRuntimeManager.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; @@ -198,10 +199,24 @@ public void signalEvent(String type, Object event) { // process currently active runtime engines Map currentlyActive = local.get(); + if (currentlyActive != null && !currentlyActive.isEmpty()) { - RuntimeEngine[] activeEngines = currentlyActive.values().toArray(new RuntimeEngine[currentlyActive.size()]); - for (RuntimeEngine engine : activeEngines) { - Context context = ((RuntimeEngineImpl) engine).getContext(); + @SuppressWarnings("unchecked") + Entry activeEngines[] = currentlyActive.entrySet() + .toArray(new Entry[currentlyActive.size()]); + Set enginesToDelete = new HashSet<>(); + for (Entry engine : activeEngines) { + RuntimeEngineImpl engineImpl = (RuntimeEngineImpl) engine.getValue(); + if (engineImpl==null) { + continue; + } + if (engineImpl.isDisposed() || engineImpl.isInvalid()) { + Object engineKey = engine.getKey(); + logger.trace("Engine with key {} is not longer valid", engineKey); + enginesToDelete.add(engineKey); + continue; + } + Context context = engineImpl.getContext(); if (context != null && context instanceof ProcessInstanceIdContext && ((ProcessInstanceIdContext) context).getContextId() != null) { try { diff --git a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java index 043b40bb39..1eda7e9f36 100644 --- a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java +++ b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java @@ -19,14 +19,13 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.jbpm.process.audit.JPAAuditLogService; +import org.jbpm.process.core.DisposableRuntimeEngine; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.manager.Context; import org.kie.api.runtime.manager.RuntimeManager; import org.kie.api.runtime.manager.audit.AuditService; import org.kie.api.task.TaskService; -import org.kie.internal.runtime.manager.Disposable; import org.kie.internal.runtime.manager.DisposeListener; -import org.kie.internal.runtime.manager.InternalRuntimeEngine; import org.kie.internal.runtime.manager.InternalRuntimeManager; import org.kie.internal.runtime.manager.SessionNotFoundException; @@ -36,7 +35,7 @@ * and work item handlers might be interested in receiving notification when the runtime engine is disposed of, * in order deactivate themselves too and not receive any other events. */ -public class RuntimeEngineImpl implements InternalRuntimeEngine, Disposable { +public class RuntimeEngineImpl implements DisposableRuntimeEngine { private RuntimeEngineInitlializer initializer; private Context context; @@ -143,6 +142,7 @@ public void setManager(RuntimeManager manager) { this.manager = manager; } + @Override public boolean isDisposed() { return disposed; } diff --git a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java index 10f7da7613..1a574fad68 100644 --- a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java +++ b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java @@ -32,21 +32,18 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; -import javax.ejb.ConcurrencyManagement; -import javax.ejb.ConcurrencyManagementType; import javax.ejb.Lock; import javax.ejb.LockType; import javax.ejb.NoSuchObjectLocalException; +import javax.ejb.SessionContext; import javax.ejb.Singleton; import javax.ejb.Startup; import javax.ejb.Timeout; import javax.ejb.Timer; import javax.ejb.TimerConfig; import javax.ejb.TimerHandle; -import javax.ejb.TransactionManagement; -import javax.ejb.TransactionManagementType; -import javax.transaction.RollbackException; -import javax.transaction.UserTransaction; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; import org.drools.core.time.JobHandle; import org.drools.core.time.impl.TimerJobInstance; @@ -58,8 +55,6 @@ @Singleton @Startup -@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER) -@TransactionManagement(TransactionManagementType.BEAN) @Lock(LockType.READ) public class EJBTimerScheduler { @@ -70,6 +65,8 @@ public class EJBTimerScheduler { private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3")); private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000")); + + private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200")); private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false")); @@ -77,10 +74,10 @@ public class EJBTimerScheduler { @Resource protected javax.ejb.TimerService timerService; - + @Resource - protected UserTransaction utx; - + protected SessionContext ctx; + public void setUseLocalCache(boolean useLocalCache) { this.useLocalCache = useLocalCache; } @@ -97,112 +94,78 @@ public void executeTimerJob(Timer timer) { EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo(); TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance(); logger.debug("About to execute timer for job {}", timerJob); - - String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId(); - // handle overdue timers as ejb timer service might start before all deployments are ready long time = 0; - while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) { - logger.debug("waiting for timer service to be available, elapsed time {} ms", time); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - time += 500; - - if (time > OVERDUE_WAIT_TIME) { - logger.debug("No timer service found after waiting {} ms", time); - break; + try { + while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) { + logger.debug("waiting for timer service to be available, elapsed time {} ms", time); + Thread.sleep(OVERDUE_CHECK_TIME); + time += OVERDUE_CHECK_TIME; + if (time > OVERDUE_WAIT_TIME) { + logger.debug("No timer service found after waiting {} ms", time); + break; + } } - } + } catch (InterruptedException e) { + logger.warn("Thread has been interrupted", e); + Thread.currentThread().interrupt(); + } try { - transaction(this::executeTimerJobInstance, timerJobInstance); - } catch (SessionNotFoundException e) { - logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e); - removeUnrecoverableTimer(timerJob, timer); + invokeTransaction(this::executeTimerJobInstance, timerJobInstance); } catch (Exception e) { recoverTimerJobInstance(timerJob, timer, e); } } private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception { - try { - ((Callable) timerJobInstance).call(); - } catch (Exception e) { - throw e; - } + ((Callable) timerJobInstance).call(); } - private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) { - try { - Transaction tx = timerJobInstance -> { - if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) { - logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance()); - } - }; - transaction(tx, ejbTimerJob.getTimerJobInstance()); - } catch (Exception e1) { - logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1); - } - } - - private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) { - if (isSessionNotFound(e)) { - logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e); + private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) { + Transaction tx; + if (isSessionNotFound(cause)) { // if session is not found means the process has already finished. In this case we just need to remove // the timer and avoid any recovery as it should not trigger any more timers. - removeUnrecoverableTimer(ejbTimerJob, timer); - return; + tx = timerJobInstance -> { + logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause); + if (!removeJob(timerJobInstance.getJobHandle(), timer)) { + logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance); + } + }; } - - if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) { + else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) { // this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost // because of the transaction, so we need to do this here. - try { - - logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance()); - Transaction tx = timerJobInstance -> { - if (this.removeJob(timerJobInstance.getJobHandle(), null)) { - this.internalSchedule(timerJobInstance); - } else { - logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance()); - } - }; - transaction(tx, ejbTimerJob.getTimerJobInstance()); - } catch (Exception e1) { - logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1); - } - return; + tx = timerJobInstance -> { + logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance); + if (removeJob(timerJobInstance.getJobHandle(), null)) { + internalSchedule(timerJobInstance); + } else { + logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance); + } + }; + } + else { + // if there is not next date to be fired, we need to apply policy otherwise will be lost + tx = timerJobInstance -> { + logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance); + ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS); + EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance); + if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) { + logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT); + } else { + TimerConfig config = new TimerConfig(info, true); + Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config); + ((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle()); + ((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer)); + } + }; } - - // if there is not next date to be fired, we need to apply policy otherwise will be lost - - logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance()); - Transaction operation = (instance) -> { - ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS); - EjbTimerJobRetry info = null; - if(ejbTimerJob instanceof EjbTimerJobRetry) { - info = ((EjbTimerJobRetry) ejbTimerJob).next(); - } else { - info = new EjbTimerJobRetry(instance); - } - if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) { - logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT); - return; - } - TimerConfig config = new TimerConfig(info, true); - Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config); - TimerHandle handler = newTimer.getHandle(); - ((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler); - ((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer)); - }; try { - transaction(operation, ejbTimerJob.getTimerJobInstance()); - } catch (Exception e1) { - logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1); + invokeTransaction (tx, ejbTimerJob.getTimerJobInstance()); + } catch (Exception e) { + logger.error("Failed to executed timer recovery", e); } - } private boolean isSessionNotFound(Exception e) { @@ -218,30 +181,16 @@ private boolean isSessionNotFound(Exception e) { @FunctionalInterface private interface Transaction { - void doWork(I item) throws Exception; } - private void transaction(Transaction operation, I item) throws Exception { - try { - utx.begin(); - operation.doWork(item); - utx.commit(); - } catch(RollbackException e) { - logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus()); - if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) { - utx.rollback(); - } - throw new RuntimeException("jbpm timer has been rolledback", e); - } catch (Exception e) { - try { - utx.rollback(); - } catch (Exception re) { - logger.error("transaction could not be rolled back", re); - } - - throw e; - } + @TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW) + public void transaction(Transaction operation, I item) throws Exception { + operation.doWork(item); + } + + private void invokeTransaction (Transaction operation, I item) throws Exception { + ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item); } public void internalSchedule(TimerJobInstance timerJobInstance) { diff --git a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java index a6bc947c2b..ebcf8183a4 100644 --- a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java +++ b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java @@ -39,7 +39,6 @@ import org.drools.core.time.impl.TimerJobInstance; import org.drools.persistence.api.TransactionManager; import org.drools.persistence.api.TransactionManagerFactory; -import org.drools.persistence.api.TransactionSynchronization; import org.drools.persistence.jta.JtaTransactionManager; import org.jbpm.process.core.timer.GlobalSchedulerService; import org.jbpm.process.core.timer.JobNameHelper; @@ -103,31 +102,12 @@ public boolean removeJob(JobHandle jobHandle) { String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid(); final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid)); if (TRANSACTIONAL && ejbTimer == null) { - // this situation needs to be avoided as it should not happen + logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid); return false; } - JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager(); - try { - tm.registerTransactionSynchronization(new TransactionSynchronization() { - @Override - public void beforeCompletion() { - } - - @Override - public void afterCompletion(int status) { - if (status == TransactionManager.STATUS_COMMITTED) { - logger.debug("remove job {} after commited", jobHandle); - scheduler.removeJob(jobHandle, ejbTimer); - } - } - - }); - logger.debug("register tx to remove job {}", jobHandle); - return true; - } catch (Exception e) { - logger.debug("remove job {} outside tx", jobHandle); - return scheduler.removeJob(jobHandle, ejbTimer); - } + boolean result = scheduler.removeJob(jobHandle, ejbTimer); + logger.debug("Remove job returned {}", result); + return result; } private TimerJobInstance getTimerJobInstance (String uuid) {