Skip to content

Commit

Permalink
Merge branch '7.67.x' into 7.67.x_JBPM-10208
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Nov 17, 2023
2 parents 8032774 + 1a7fb8b commit 6570070
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 163 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.process.core;

import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;

public interface DisposableRuntimeEngine extends InternalRuntimeEngine, Disposable {
boolean isDisposed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.drools.core.time.impl.DefaultJobHandle;
import org.drools.core.time.impl.TimerJobFactoryManager;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.process.core.DisposableRuntimeEngine;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.NamedJobContext;
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
Expand Down Expand Up @@ -420,7 +421,10 @@ public Environment getEnvironment() {

return runtime.getKieSession().getEnvironment();
}


public boolean isDisposed() {
return runtime instanceof DisposableRuntimeEngine && ((DisposableRuntimeEngine)runtime).isDisposed();
}
}

public List<TimerServiceListener> getListeners() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
public class JpaProcessPersistenceContext extends JpaPersistenceContext
implements
ProcessPersistenceContext {


public JpaProcessPersistenceContext(EntityManager em, TransactionManager txm) {
super( em, txm );
Expand All @@ -66,7 +67,10 @@ public PersistentProcessInstance findProcessInstanceInfo(Long processId) {
if( this.pessimisticLocking ) {
return em.find( ProcessInstanceInfo.class, processId, lockMode );
}
return em.find( ProcessInstanceInfo.class, processId );
logger.trace("Reading process instance info {} with em {}", processId, em);
ProcessInstanceInfo pi = em.find(ProcessInstanceInfo.class, processId);
logger.trace("Process instance info read {}", pi);
return pi;
}

public void remove(PersistentProcessInstance processInstanceInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@
import org.jbpm.workflow.instance.impl.WorkflowProcessInstanceImpl;
import org.kie.api.runtime.Environment;
import org.kie.api.runtime.process.ProcessInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Entity
@SequenceGenerator(name="processInstanceInfoIdSeq", sequenceName="PROCESS_INSTANCE_INFO_ID_SEQ")
public class ProcessInstanceInfo implements PersistentProcessInstance {

private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceInfo.class);
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator="processInstanceInfoIdSeq")
@Column(name = "InstanceId")
Expand Down Expand Up @@ -192,9 +195,10 @@ public ProcessInstance getProcessInstance(InternalKnowledgeRuntime kruntime,
}
context.close();
} catch ( IOException e ) {
e.printStackTrace();
throw new IllegalArgumentException( "IOException while loading process instance: " + e.getMessage(),
e );
throw new IllegalArgumentException( "IOException while loading process instance: " + e.getMessage(), e);
} catch (RuntimeException e) {
logger.error("Error unmarshalling process instance info {}", processInstanceId, e);
throw e;
}
}
((WorkflowProcessInstanceImpl) processInstance).internalSetStartDate(this.startDate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public GlobalJpaTimerJobInstance(Job job, JobContext ctx, Trigger trigger,
@Override
public Void call() throws Exception {
AsyncExecutionMarker.markAsync();
ExecutableRunner runner = null;
ExecutableRunner<?> runner = null;
TransactionManager jtaTm = null;
boolean success = false;
try {
Expand All @@ -88,16 +88,16 @@ public Void call() throws Exception {
success = true;
return null;
} catch( Exception e ) {
e.printStackTrace();
logger.error("Exception executing timer", e);
success = false;
throw e;
} finally {
AsyncExecutionMarker.reset();
if (runner != null && runner instanceof DisposableCommandService) {
if (allowedToDispose(((DisposableCommandService) runner).getEnvironment())) {
logger.debug("Allowed to dispose command service from global timer job instance");
((DisposableCommandService) runner).dispose();
}
if (runner != null && runner instanceof DisposableCommandService) {
if (allowedToDispose(((DisposableCommandService) runner))) {
logger.debug("Allowed to dispose command service from global timer job instance");
((DisposableCommandService) runner).dispose();
}
}
closeTansactionIfNeeded(jtaTm, success);
}
Expand All @@ -123,7 +123,11 @@ public String toString() {
return "GlobalJpaTimerJobInstance [timerServiceId=" + timerServiceId
+ ", getJobHandle()=" + getJobHandle() + "]";
}


private boolean allowedToDispose(DisposableCommandService disposableCommandService) {
return !disposableCommandService.isDisposed() && allowedToDispose (disposableCommandService.getEnvironment());
}

protected boolean allowedToDispose(Environment environment) {
if (hasEnvironmentEntry(environment, "IS_JTA_TRANSACTION", false)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.kie.internal.runtime.manager.RuntimeManagerRegistry;
import org.kie.internal.runtime.manager.SecurityManager;
import org.kie.internal.runtime.manager.SessionFactory;
import org.kie.internal.runtime.manager.SessionNotFoundException;
import org.kie.internal.runtime.manager.TaskServiceFactory;
import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext;
import org.kie.internal.runtime.manager.deploy.DeploymentDescriptorManager;
Expand Down Expand Up @@ -389,8 +390,8 @@ protected boolean canDispose(RuntimeEngine runtime) {
&& tm.getStatus() != TransactionManager.STATUS_COMMITTED) {
return false;
}
} catch (Exception e) {
logger.warn("Exception dealing with transaction", e);
} catch (SessionNotFoundException e) {
logger.warn("Session not found exception", e);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -198,10 +199,24 @@ public void signalEvent(String type, Object event) {

// process currently active runtime engines
Map<Object, RuntimeEngine> currentlyActive = local.get();

if (currentlyActive != null && !currentlyActive.isEmpty()) {
RuntimeEngine[] activeEngines = currentlyActive.values().toArray(new RuntimeEngine[currentlyActive.size()]);
for (RuntimeEngine engine : activeEngines) {
Context<?> context = ((RuntimeEngineImpl) engine).getContext();
@SuppressWarnings("unchecked")
Entry<Object, RuntimeEngine> activeEngines[] = currentlyActive.entrySet()
.toArray(new Entry[currentlyActive.size()]);
Set<Object> enginesToDelete = new HashSet<>();
for (Entry<Object, RuntimeEngine> engine : activeEngines) {
RuntimeEngineImpl engineImpl = (RuntimeEngineImpl) engine.getValue();
if (engineImpl==null) {
continue;
}
if (engineImpl.isDisposed() || engineImpl.isInvalid()) {
Object engineKey = engine.getKey();
logger.trace("Engine with key {} is not longer valid", engineKey);
enginesToDelete.add(engineKey);
continue;
}
Context<?> context = engineImpl.getContext();
if (context != null && context instanceof ProcessInstanceIdContext
&& ((ProcessInstanceIdContext) context).getContextId() != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import java.util.concurrent.CopyOnWriteArrayList;

import org.jbpm.process.audit.JPAAuditLogService;
import org.jbpm.process.core.DisposableRuntimeEngine;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.manager.Context;
import org.kie.api.runtime.manager.RuntimeManager;
import org.kie.api.runtime.manager.audit.AuditService;
import org.kie.api.task.TaskService;
import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.DisposeListener;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.SessionNotFoundException;

Expand All @@ -36,7 +35,7 @@
* and work item handlers might be interested in receiving notification when the runtime engine is disposed of,
* in order deactivate themselves too and not receive any other events.
*/
public class RuntimeEngineImpl implements InternalRuntimeEngine, Disposable {
public class RuntimeEngineImpl implements DisposableRuntimeEngine {

private RuntimeEngineInitlializer initializer;
private Context<?> context;
Expand Down Expand Up @@ -143,6 +142,7 @@ public void setManager(RuntimeManager manager) {
this.manager = manager;
}

@Override
public boolean isDisposed() {
return disposed;
}
Expand Down
Loading

0 comments on commit 6570070

Please sign in to comment.