From 65d7bcf6d2d5cbe1c4e1af21d71d21946daaa5c2 Mon Sep 17 00:00:00 2001 From: Stephan Pelikan Date: Thu, 18 Apr 2024 16:02:19 +0200 Subject: [PATCH] Provide ability to set task-specific Zeebe properties --- spring-boot/README.md | 42 ++++ .../camunda8/Camunda8VanillaBpProperties.java | 224 +++++++++++++++++- .../camunda8/wiring/Camunda8TaskWiring.java | 49 ++-- 3 files changed, 284 insertions(+), 31 deletions(-) diff --git a/spring-boot/README.md b/spring-boot/README.md index e668160..b2dfcf8 100644 --- a/spring-boot/README.md +++ b/spring-boot/README.md @@ -16,6 +16,7 @@ To run Camunda 8 on your local computer for development purposes consider to use 1. [Worker ID](#worker-id) 1. [Module aware deployment](#module-aware-deployment) 1. [SPI Binding validation](#spi-binding-validation) + 1. [Managing Camunda 8 connectivity](#managing-camunda-8-connectivity) 1. [Multi-instance](#multi-instance) 1. [Message correlation IDs](#message-correlation-ids) 1. [Transaction behavior](#transaction-behavior) @@ -94,6 +95,47 @@ spring: ddl-auto: update ``` +### Managing Camunda 8 connectivity + +The Camunda 8 adapter is based on the [spring-zeebe](https://github.com/camunda-community-hub/spring-zeebe) client. +Therefore, all settings about Camunda 8 connectivity have to be provided as can be found in that [documentation](https://github.com/camunda-community-hub/spring-zeebe#configuring-camunda-8-connection). + +However, there are a couple of settings specific to tasks: + +1. `task-timeout`: The lock duration Zeebe will wait, after the task has been fetched by your implementation, + until Zeebe assumes your execution has crashed and the task needs to be re-fetched. Zeebe`s current + default value is 5 minutes. Set this values in respect to the expected time to complete the task. +1. `stream-enabled`: Whether to use polling or streaming to receive new tasks. Current default value is `false` (means polling). +1. `stream-timeout`: The duration to refresh the stream's connection. Current default value is no timeout. +1. `poll-interval`: Interval to poll for new tasks, if streaming of tasks is disabled. Zeebe's current default value is 100ms. +1. `poll-request-timeout`: The request-timeout for polling Zeebe for new tasks. Zeebe`s current default value is 20 seconds. + +All these values can be set for specific tasks or all tasks of a workflow or all tasks +of a workflow module. Task-specific values will override workflow's or workflow-module's values and workflow-specific +values will override workflow-module's values: + +```yaml +vanillabp: + workflow-modules: + Demo: + adapters: + camunda8: + # default to all workflows of the workflow-module `Demo` + task-timeout: PT5M + workflows: + DemoWorkflow: + adapters: + camunda8: + # default to all tasks of the workflow `DemoWorkflow` + task-timeout: P10M # overrides vanillabp.workflow-modules.Demo.adapters.camunda8.task-timeout + tasks: + logError: + adapters: + camunda8: + # used only for the task 'logError' of the workflow `DemoWorkflow` + task-timeout: PT3S # overrides vanillabp.workflow-modules.Demo.workflows.DemoWorkflow.adapters.camunda8.task-timeout +``` + ## Multi-instance Since Camunda 8 is a remote engine the workflow is processed in a different runtime environment. Due to this fact the Blueprint adapter cannot do the entire binding of multi-instance context information under the hood. In the BPMN the multi-instance aspects like the input element, the element's index and the total number of elements have to be defined according to a certain naming convention: diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java index 1df3507..26e459b 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/Camunda8VanillaBpProperties.java @@ -1,9 +1,11 @@ package io.vanillabp.camunda8; +import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1; import io.vanillabp.springboot.adapter.VanillaBpProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.util.StringUtils; +import java.time.Duration; import java.util.Map; @ConfigurationProperties(prefix = VanillaBpProperties.PREFIX, ignoreUnknownFields = true) @@ -44,7 +46,58 @@ public String getTenantId( } - public static class AdapterConfiguration { + public WorkerProperties getUserTaskWorkerProperties( + final String workflowModuleId) { + + return getWorkerProperties(workflowModuleId, null, null); + + } + + public WorkerProperties getWorkerProperties( + final String workflowModuleId, + final String bpmnProcessId, + final String taskDefinition) { + + WorkerProperties result = new WorkerProperties(); + + final var workflowModule = workflowModules.get(workflowModuleId); + if (workflowModule == null) { + return result; + } + final var workflowModuleAdapter = workflowModule.getAdapters().get(Camunda8AdapterConfiguration.ADAPTER_ID); + if (workflowModuleAdapter != null) { + result.apply(workflowModuleAdapter); + } + + if (bpmnProcessId == null) { + return result; + } + final var workflow = workflowModule.getWorkflows().get(bpmnProcessId); + if (workflow == null) { + return result; + } + final var workflowAdapter = workflow.getAdapters().get(Camunda8AdapterConfiguration.ADAPTER_ID); + if (workflowAdapter != null) { + result.apply(workflowAdapter); + } + + if (taskDefinition == null) { + return result; + } + final var task = workflow.getTasks().get(taskDefinition); + if (task == null) { + return result; + } + final var taskAdapter = task.getAdapters().get(Camunda8AdapterConfiguration.ADAPTER_ID); + if (taskAdapter != null) { + result.apply(taskAdapter); + } + + return result; + + } + + public static class AdapterConfiguration extends WorkerProperties { private boolean useTenants = true; @@ -74,6 +127,8 @@ public static class WorkflowModuleAdapterProperties { private Map adapters = Map.of(); + private Map workflows = Map.of(); + public Map getAdapters() { return adapters; } @@ -82,6 +137,173 @@ public void setAdapters(Map adapters) { this.adapters = adapters; } + public Map getWorkflows() { return workflows; } + + public void setWorkflows(Map workflows) { + + this.workflows = workflows; + workflows.forEach((bpmnProcessId, properties) -> { + properties.bpmnProcessId = bpmnProcessId; + properties.workflowModule = this; + }); + + } + + } + + public static class WorkflowAdapterProperties { + + String bpmnProcessId; + + WorkflowModuleAdapterProperties workflowModule; + + private Map adapters = Map.of(); + + private Map tasks = Map.of(); + + public WorkflowModuleAdapterProperties getWorkflowModule() { + return workflowModule; + } + + public String getBpmnProcessId() { + return bpmnProcessId; + } + + public Map getAdapters() { + return adapters; + } + + public void setAdapters(Map adapters) { + this.adapters = adapters; + } + + public Map getTasks() { + return tasks; + } + + public void setTasks(Map tasks) { + this.tasks = tasks; + } + + } + + public static class WorkerProperties { + + public WorkerProperties() {} + + public void apply( + final WorkerProperties original) { + + if (original.taskTimeout != null) { + this.taskTimeout = original.taskTimeout; + } + if (original.pollInterval != null) { + this.pollInterval = original.pollInterval; + } + if (original.pollRequestTimeout != null) { + this.pollRequestTimeout = original.pollRequestTimeout; + } + if (original.isStreamEnabled() != null) { + this.streamEnabled = original.isStreamEnabled(); + } + if (original.streamTimeout != null) { + this.streamTimeout = original.streamTimeout; + } + + } + + public void applyToWorker( + final JobWorkerBuilderStep1.JobWorkerBuilderStep3 workerBuilder) { + + if (taskTimeout != null) { + workerBuilder.timeout(taskTimeout); + } + applyToUserTaskWorker(workerBuilder); + + } + + public void applyToUserTaskWorker( + final JobWorkerBuilderStep1.JobWorkerBuilderStep3 workerBuilder) { + + if (pollInterval != null) { + workerBuilder.pollInterval(pollInterval); + } + if (pollRequestTimeout != null) { + workerBuilder.requestTimeout(pollRequestTimeout); + } + if (streamEnabled!= null) { + workerBuilder.streamEnabled(streamEnabled); + } + if (streamTimeout != null) { + workerBuilder.streamTimeout(streamTimeout); + } + + } + + private Duration taskTimeout = null; + + private Duration pollInterval = null; + + private Duration pollRequestTimeout = null; + + private Boolean streamEnabled = null; + + private Duration streamTimeout = null; + + public Duration getTaskTimeout() { + return taskTimeout; + } + + public Duration getPollInterval() { + return pollInterval; + } + + public Duration getPollRequestTimeout() { + return pollRequestTimeout; + } + + public Boolean isStreamEnabled() { + return streamEnabled; + } + + public Duration getStreamTimeout() { + return streamTimeout; + } + + public void setTaskTimeout(Duration taskTimeout) { + this.taskTimeout = taskTimeout; + } + + public void setPollInterval(Duration pollInterval) { + this.pollInterval = pollInterval; + } + + public void setPollRequestTimeout(Duration pollRequestTimeout) { + this.pollRequestTimeout = pollRequestTimeout; + } + + public void setStreamEnabled(boolean streamEnabled) { + this.streamEnabled = streamEnabled; + } + + public void setStreamTimeout(Duration streamTimeout) { + this.streamTimeout = streamTimeout; + } + + } + + public static class TaskProperties { + + private Map adapters = Map.of(); + + public Map getAdapters() { + return adapters; + } + + public void setAdapters(Map adapters) { + this.adapters = adapters; + } + } } diff --git a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java index 1c2104a..c3da080 100644 --- a/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java +++ b/spring-boot/src/main/java/io/vanillabp/camunda8/wiring/Camunda8TaskWiring.java @@ -34,7 +34,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.function.Consumer; import java.util.stream.Stream; @@ -103,16 +102,22 @@ public void accept( public void openWorkers() { // fetch all usertasks spawned - if(!userTaskTenantIds.isEmpty()){ - workers.add( - client + userTaskTenantIds + .stream() + .map(workflowModuleId -> { + final var tenantId = camunda8Properties.getTenantId(workflowModuleId); + final var userTaskWorker = client .newWorker() .jobType("io.camunda.zeebe:userTask") .handler(userTaskHandler) .timeout(Integer.MAX_VALUE) // user-tasks are not fetched more than once .name(workerId) - .tenantIds(userTaskTenantIds.stream().filter(Objects::nonNull).toList())); - } + .tenantId(tenantId); + final var workerProperties = camunda8Properties.getUserTaskWorkerProperties(workflowModuleId); + workerProperties.applyToUserTaskWorker(userTaskWorker); + return userTaskWorker; + }) + .forEach(workers::add); workers .forEach(JobWorkerBuilderStep3::open); @@ -242,45 +247,29 @@ protected void connectToBpms( connectable.getBpmnProcessId(), connectable.getElementId(), taskHandler); - userTaskTenantIds.add(tenantId); + userTaskTenantIds.add(workflowModuleId); return; } final var variablesToFetch = getVariablesToFetch(idPropertyName, parameters); - final var worker = client .newWorker() .jobType(connectable.getTaskDefinition()) .handler(taskHandler) .name(workerId) .fetchVariables(variablesToFetch); + + final var workerProperties = camunda8Properties.getWorkerProperties( + workflowModuleId, + connectable.getBpmnProcessId(), + connectable.getTaskDefinition()); + workerProperties.applyToWorker(worker); + workers.add( tenantId != null ? worker.tenantId(tenantId) : worker); - - // using defaults from config if null, 0 or negative -// if (zeebeWorkerValue.getName() != null && zeebeWorkerValue.getName().length() > 0) { -// builder.name(zeebeWorkerValue.getName()); -// } else { -// builder.name(beanInfo.getBeanName() + "#" + zeebeWorkerValue.getMethodInfo().getMethodName()); -// } -// if (zeebeWorkerValue.getMaxJobsActive() > 0) { -// builder.maxJobsActive(zeebeWorkerValue.getMaxJobsActive()); -// } -// if (zeebeWorkerValue.getTimeout() > 0) { -// builder.timeout(zeebeWorkerValue.getTimeout()); -// } -// if (zeebeWorkerValue.getPollInterval() > 0) { -// builder.pollInterval(Duration.ofMillis(zeebeWorkerValue.getPollInterval())); -// } -// if (zeebeWorkerValue.getRequestTimeout() > 0) { -// builder.requestTimeout(Duration.ofSeconds(zeebeWorkerValue.getRequestTimeout())); -// } -// if (zeebeWorkerValue.getFetchVariables().length > 0) { -// builder.fetchVariables(zeebeWorkerValue.getFetchVariables()); -// } }