Skip to content

Commit

Permalink
avoid race condition for awaiting signals and process instance completed
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Mar 20, 2024
1 parent 9e77a4f commit e8834bc
Showing 1 changed file with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.kie.api.task.TaskService;
import org.kie.internal.command.RegistryContext;
import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.Mapper;
import org.kie.internal.runtime.manager.SessionFactory;
Expand Down Expand Up @@ -163,11 +164,10 @@ public RuntimeEngine getRuntimeEngine(Context<?> context) {

@Override
public void signalEvent(String type, Object event) {

// first signal with new context in case there are start event with signal
KieSession signalSession = null;
Set<RuntimeEngine> signalledEngines = new HashSet<>();
RuntimeEngine runtimeEngine = getRuntimeEngine(ProcessInstanceIdContext.get());
RuntimeEngineImpl runtimeEngine = (RuntimeEngineImpl) getRuntimeEngine(ProcessInstanceIdContext.get());
try {
// signal execution can rise errors
signalSession = runtimeEngine.getKieSession();
Expand All @@ -182,32 +182,35 @@ public void signalEvent(String type, Object event) {
}

// next find out all instances waiting for given event type
runtimeEngine = null;
List<String> processInstances = ((InternalMapper) mapper).findContextIdForEvent(type, getIdentifier());
for (String piId : processInstances) {
runtimeEngine = getRuntimeEngine(ProcessInstanceIdContext.get(Long.parseLong(piId)));
try {
runtimeEngine = (RuntimeEngineImpl) getRuntimeEngine(ProcessInstanceIdContext.get(Long.parseLong(piId)));
// signal execution can rise errors
if (!signalledEngines.contains(runtimeEngine)) {
runtimeEngine.getKieSession().signalEvent(type, event);
signalledEngines.add(runtimeEngine);
runtimeEngine.getKieSession().signalEvent(type, event);
}
} catch (org.drools.persistence.api.SessionNotFoundException ex) {
logger.warn("Signal event cannot proceed because of session not found exception {} for engine {}", ex.getMessage(), runtimeEngine.getKieSessionId());
} finally {
// ensure we clean up
disposeRuntimeEngine(runtimeEngine);
if (runtimeEngine != null) {
disposeRuntimeEngine(runtimeEngine);
}
}
}

// process currently active runtime engines
Map<Object, RuntimeEngine> currentlyActive = local.get();

if (currentlyActive != null && !currentlyActive.isEmpty()) {
@SuppressWarnings("unchecked")
Entry<Object, RuntimeEngine> activeEngines[] = currentlyActive.entrySet()
.toArray(new Entry[currentlyActive.size()]);
Entry<Object, RuntimeEngine> activeEngines[] = currentlyActive.entrySet().toArray(new Entry[currentlyActive.size()]);
Set<Object> enginesToDelete = new HashSet<>();
for (Entry<Object, RuntimeEngine> engine : activeEngines) {
RuntimeEngineImpl engineImpl = (RuntimeEngineImpl) engine.getValue();
if (engineImpl==null) {
if (engineImpl == null) {
continue;
}
if (engineImpl.isDisposed() || engineImpl.isInvalid()) {
Expand All @@ -217,13 +220,11 @@ public void signalEvent(String type, Object event) {
continue;
}
Context<?> context = engineImpl.getContext();
if (context != null && context instanceof ProcessInstanceIdContext
&& ((ProcessInstanceIdContext) context).getContextId() != null) {
if (context instanceof ProcessInstanceIdContext && ((ProcessInstanceIdContext) context).getContextId() != null) {
try {
if (!signalledEngines.contains(engineImpl)) {
engineImpl.getKieSession().signalEvent(type, event,
((ProcessInstanceIdContext) context).getContextId());
signalledEngines.add(engineImpl);
engineImpl.getKieSession().signalEvent(type, event, ((ProcessInstanceIdContext) context).getContextId());
}
} catch (org.drools.persistence.api.SessionNotFoundException ex) {
logger.warn(
Expand Down

0 comments on commit e8834bc

Please sign in to comment.