diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java index 4ba5a73..b5ffa45 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionAspect.java @@ -25,8 +25,17 @@ public static class TaskHandlerActions { public Supplier>> handlerCompletedCommand; } + public static class RunDeferredInTransaction { + public RunDeferredInTransactionSupplier[] argsSupplier; + public Runnable saveAggregateAfterWorkflowTask; + } + + public interface RunDeferredInTransactionSupplier extends Supplier { } + public static final ThreadLocal actions = ThreadLocal.withInitial(TaskHandlerActions::new); + public static final ThreadLocal runDeferredInTransaction = ThreadLocal.withInitial(RunDeferredInTransaction::new); + private final ApplicationEventPublisher publisher; public Camunda8TransactionAspect( @@ -36,20 +45,44 @@ public Camunda8TransactionAspect( } + public static void registerDeferredInTransaction( + final RunDeferredInTransactionSupplier[] argsSupplier, + final Runnable saveAggregateAfterWorkflowTask) { + + runDeferredInTransaction.get().argsSupplier = argsSupplier; + runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = saveAggregateAfterWorkflowTask; + + } + + public static void unregisterDeferredInTransaction() { + + runDeferredInTransaction.get().argsSupplier = null; + runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = null; + + } + + private void saveWorkflowAggregate() { + + runDeferredInTransaction.get().saveAggregateAfterWorkflowTask.run(); + + } + @Around("@annotation(io.vanillabp.spi.service.WorkflowTask)") private Object checkForTransaction( final ProceedingJoinPoint pjp) throws Throwable { final var methodSignature = pjp.getSignature().toLongString(); - - if (!TransactionSynchronizationManager.isActualTransactionActive()) { - clearCallbacks(); - logger.trace("Disable TX callbacks for {}: No TX active", methodSignature); - } + + final var isTxActive = TransactionSynchronizationManager.isActualTransactionActive(); + try { - final var value = pjp.proceed(); - if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + final var newArgs = runDeferredInTransactionArgsSupplier(pjp.getArgs()); + final var value = pjp.proceed(newArgs); // run @WorkflowTask annotated method + saveWorkflowAggregate(); + + if (isTxActive + && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) { final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); if (handlerTestCommand != null) { publisher.publishEvent( @@ -62,18 +95,37 @@ private Object checkForTransaction( if (actions.get().handlerCompletedCommand != null) { final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get(); if (handlerCompletedCommand != null) { - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - methodSignature, - handlerCompletedCommand.getKey(), - handlerCompletedCommand.getValue())); + if (isTxActive) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + handlerCompletedCommand.getKey(), + handlerCompletedCommand.getValue())); + } else { + try { + handlerCompletedCommand.getKey().run(); + } catch (Exception e) { + final var description = handlerCompletedCommand.getValue(); + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.get(), + e); + } else { + logger.error( + "Manual action required due to:", + e); + } + } + } } } return value; } catch (TaskException taskError) { - if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + if (isTxActive + && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) { final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); if (handlerTestCommand != null) { publisher.publishEvent( @@ -86,17 +138,35 @@ private Object checkForTransaction( if (actions.get().bpmnErrorCommand != null) { final var runnable = actions.get().bpmnErrorCommand.getKey(); final var description = actions.get().bpmnErrorCommand.getValue(); - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - methodSignature, - () -> runnable.accept(taskError), - () -> description.apply(taskError))); + if (isTxActive) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + () -> runnable.accept(taskError), + () -> description.apply(taskError))); + } else { + try { + runnable.accept(taskError); + } catch (Exception e) { + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.apply(taskError), + e); + } else { + logger.error( + "Manual action required due to:", + e); + } + } + } } return null; } catch (Exception e) { - if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) { + if (isTxActive + && (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) { final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get(); if (handlerTestCommand != null) { publisher.publishEvent( @@ -109,18 +179,31 @@ private Object checkForTransaction( if (actions.get().handlerFailedCommand != null) { final var runnable = actions.get().handlerFailedCommand.getKey(); final var description = actions.get().handlerFailedCommand.getValue(); - publisher.publishEvent( - new Camunda8TransactionProcessor.Camunda8CommandAfterTx( - methodSignature, - () -> runnable.accept(e), - () -> description.apply(e))); + if (isTxActive) { + publisher.publishEvent( + new Camunda8TransactionProcessor.Camunda8CommandAfterTx( + methodSignature, + () -> runnable.accept(e), + () -> description.apply(e))); + } else { + try { + runnable.accept(e); + } catch (Exception ie) { + if (description != null) { + logger.error( + "Could not execute '{}'! Manual action required!", + description.apply(e), + ie); + } else { + logger.error( + "Manual action required due to:", + ie); + } + } + } } throw e; - } finally { - - clearCallbacks(); - } } @@ -134,4 +217,25 @@ public static void clearCallbacks() { } + private Object[] runDeferredInTransactionArgsSupplier( + final Object[] originalArgs) { + + if (originalArgs == null) { + return null; + } + + final var newArgs = new Object[ originalArgs.length ]; + for (var i = 0; i < originalArgs.length; ++i) { + final var supplier = runDeferredInTransaction.get().argsSupplier[i]; + if (supplier != null) { + newArgs[i] = supplier.get(); + } else { + newArgs[i] = originalArgs[i]; + } + } + + return newArgs; + + } + } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java index 82a6b27..cf19ec4 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/service/Camunda8TransactionProcessor.java @@ -63,11 +63,7 @@ public static Map.Entry> handlerCompletedCommandCallb public static void unregisterCallbacks() { - final var actions = Camunda8TransactionAspect.actions.get(); - actions.testForTaskAlreadyCompletedOrCancelledCommand = null; - actions.bpmnErrorCommand = null; - actions.handlerFailedCommand = null; - actions.handlerCompletedCommand = null; + Camunda8TransactionAspect.clearCallbacks(); } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java index 1904f16..57f7b1c 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskHandler.java @@ -4,18 +4,23 @@ import io.camunda.zeebe.client.api.response.ActivatedJob; import io.camunda.zeebe.client.api.worker.JobClient; import io.camunda.zeebe.client.api.worker.JobHandler; +import io.vanillabp.camunda8.service.Camunda8TransactionAspect; import io.vanillabp.camunda8.service.Camunda8TransactionProcessor; import io.vanillabp.camunda8.wiring.Camunda8Connectable.Type; import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceIndexMethodParameter; import io.vanillabp.camunda8.wiring.parameters.Camunda8MultiInstanceTotalMethodParameter; +import io.vanillabp.spi.service.MultiInstanceElementResolver; import io.vanillabp.spi.service.TaskEvent; import io.vanillabp.spi.service.TaskException; import io.vanillabp.springboot.adapter.MultiInstance; import io.vanillabp.springboot.adapter.TaskHandlerBase; import io.vanillabp.springboot.adapter.wiring.WorkflowAggregateCache; import io.vanillabp.springboot.parameters.MethodParameter; +import io.vanillabp.springboot.parameters.ResolverBasedMultiInstanceMethodParameter; +import io.vanillabp.springboot.parameters.WorkflowAggregateMethodParameter; import java.lang.reflect.Method; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -61,19 +66,17 @@ public void accept( @Override protected Logger getLogger() { - + return logger; - + } @SuppressWarnings("unchecked") @Override public void handle( final JobClient client, - final ActivatedJob job) { + final ActivatedJob job) throws Exception { - Runnable jobPostAction = null; - Supplier description = null; try { final var businessKey = getVariable(job, idPropertyName); @@ -87,6 +90,10 @@ public void handle( final var taskIdRetrieved = new AtomicBoolean(false); final var workflowAggregateCache = new WorkflowAggregateCache(); + Camunda8TransactionAspect.registerDeferredInTransaction( + new Camunda8TransactionAspect.RunDeferredInTransactionSupplier[parameters.size()], + saveAggregateAfterWorkflowTask(workflowAggregateCache)); + // Any callback used in this method is executed in case of no active transaction. // In case of an active transaction the callbacks are used by the Camunda8TransactionInterceptor. Camunda8TransactionProcessor.registerCallbacks( @@ -97,7 +104,7 @@ public void handle( if (taskIdRetrieved.get()) { // async processing of service-task return null; } - return doTestForTaskWasCompletedOrCancelled(job); + return testForTaskWasCompletedOrCancelled(job); }, doThrowError(client, job, workflowAggregateCache), doFailed(client, job), @@ -117,7 +124,7 @@ public void handle( super.execute( workflowAggregateCache, businessKey, - true, + false, // will be done within transaction boundaries (args, param) -> processTaskParameter( args, param, @@ -157,43 +164,9 @@ public void handle( return workflowAggregateCache.workflowAggregate; }, multiInstanceSupplier)); - final var callback = Camunda8TransactionProcessor.handlerCompletedCommandCallback(); - if (callback != null) { - jobPostAction = callback.getKey(); - description = callback.getValue(); - } - } catch (TaskException bpmnError) { - final var callback = Camunda8TransactionProcessor.bpmnErrorCommandCallback(); - if (callback != null) { - jobPostAction = () -> callback.getKey().accept(bpmnError); - description = () -> callback.getValue().apply(bpmnError); - } - } catch (Exception e) { - final var callback = Camunda8TransactionProcessor.handlerFailedCommandCallback(); - if (callback != null) { - logger.error("Failed to execute job '{}'", job.getKey(), e); - jobPostAction = () -> callback.getKey().accept(e); - description = () -> callback.getValue().apply(e); - } } finally { Camunda8TransactionProcessor.unregisterCallbacks(); - } - - if (jobPostAction != null) { - try { - jobPostAction.run(); - } catch (Exception e) { - if (description != null) { - logger.error( - "Could not execute '{}'! Manual action required!", - description.get(), - e); - } else { - logger.error( - "Manual action required due to:", - e); - } - } + Camunda8TransactionAspect.unregisterDeferredInTransaction(); } } @@ -205,53 +178,64 @@ protected Object getMultiInstanceElement( return multiInstanceSupplier .apply(name); - + } - + @Override protected Integer getMultiInstanceIndex( final String name, final Function multiInstanceSupplier) { - + return (Integer) multiInstanceSupplier .apply(name + Camunda8MultiInstanceIndexMethodParameter.SUFFIX) - 1; - + } - + @Override protected Integer getMultiInstanceTotal( final String name, final Function multiInstanceSupplier) { - + return (Integer) multiInstanceSupplier .apply(name + Camunda8MultiInstanceTotalMethodParameter.SUFFIX); - + } - + @Override protected MultiInstance getMultiInstance( final String name, final Function multiInstanceSupplier) { - + return new MultiInstance( getMultiInstanceElement(name, multiInstanceSupplier), getMultiInstanceTotal(name, multiInstanceSupplier), getMultiInstanceIndex(name, multiInstanceSupplier)); - + } - + private Object getVariable( final ActivatedJob job, final String name) { - + return job .getVariablesAsMap() .get(name); - + } - @SuppressWarnings("unchecked") - public Map.Entry> doTestForTaskWasCompletedOrCancelled( + public Runnable saveAggregateAfterWorkflowTask( + final WorkflowAggregateCache aggregateCache) { + + return () -> { + if (aggregateCache.workflowAggregate != null) { + workflowAggregateRepository.save(aggregateCache.workflowAggregate); + } + }; + + } + + @SuppressWarnings("unchecked") + public Map.Entry> testForTaskWasCompletedOrCancelled( final ActivatedJob job) { return Map.entry( @@ -259,7 +243,8 @@ public Map.Entry> doTestForTaskWasCompletedOrCancelle .newUpdateTimeoutCommand(job) .timeout(Duration.ofMinutes(10)) .send() - .join(5, TimeUnit.MINUTES), // needs to run synchronously + .join(5, TimeUnit.MINUTES) + , // needs to run synchronously () -> "update timeout (BPMN: " + job.getBpmnProcessId() + "; Element: " + job.getElementId() + "; Task-Definition: " + job.getType() @@ -317,7 +302,9 @@ private Map.Entry, Function> doTh throwErrorCommand .send() - .exceptionally(t -> { throw new RuntimeException("error", t); }); + .exceptionally(t -> { + throw new RuntimeException("error", t); + }); }, taskException -> "throw error command (BPMN: " + job.getBpmnProcessId() + "; Element: " + job.getElementId() @@ -326,7 +313,7 @@ private Map.Entry, Function> doTh + "; Job: " + job.getKey() + ")"); } - + @SuppressWarnings("unchecked") private Map.Entry, Function> doFailed( final JobClient jobClient, @@ -352,4 +339,70 @@ private Map.Entry, Function> doFailed( } + protected boolean processWorkflowAggregateParameter( + final Object[] args, + final MethodParameter param, + final WorkflowAggregateCache workflowAggregateCache, + final Object workflowAggregateId) { + + if (!(param instanceof WorkflowAggregateMethodParameter)) { + return true; + } + + Camunda8TransactionAspect.runDeferredInTransaction.get().argsSupplier[param.getIndex()] = () -> { + // Using findById is required to get an object instead of a Hibernate proxy. + // Otherwise for e.g. Camunda8 connector JSON serialization of the + // workflow aggregate is not possible. + workflowAggregateCache.workflowAggregate = workflowAggregateRepository + .findById(workflowAggregateId) + .orElse(null); + return workflowAggregateCache.workflowAggregate; + }; + + args[param.getIndex()] = null; // will be set by deferred execution of supplier + + return false; + + } + + protected boolean processMultiInstanceResolverParameter( + final Object[] args, + final MethodParameter param, + final Supplier workflowAggregate, + final Function multiInstanceSupplier) { + + if (!(param instanceof ResolverBasedMultiInstanceMethodParameter)) { + return true; + } + + @SuppressWarnings("unchecked") + final var resolver = + (MultiInstanceElementResolver) + ((ResolverBasedMultiInstanceMethodParameter) param).getResolverBean(); + + final var multiInstances = new HashMap>(); + + resolver + .getNames() + .forEach(name -> multiInstances.put(name, getMultiInstance(name, multiInstanceSupplier))); + + Camunda8TransactionAspect.runDeferredInTransaction.get().argsSupplier[param.getIndex()] = () -> { + try { + return resolver.resolve(workflowAggregate.get(), multiInstances); + } catch (Exception e) { + throw new RuntimeException( + "Failed processing MultiInstanceElementResolver for parameter '" + + param.getParameter() + + "' of method '" + + method + + "'", e); + } + }; + + args[param.getIndex()] = null; // will be set by deferred execution of supplier + + return false; + + } + }