diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/README.md b/serverless-workflow-examples/serverless-workflow-subflows-event/README.md new file mode 100644 index 0000000000..9737f73215 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/README.md @@ -0,0 +1,86 @@ +# serverless-workflow-subflow-events + +This example illustrate how to trigger workflows manually with additional parameters calculated by an initial workflow. +The workflow responsible for setting up the parameters is executed as the start state. +Then, all possible workflows that might be instantiated with those parameters are registered using `event` state. `exclusive` property is set to false ensuring that the process instance remains active till all possible workflows has been executed. + +## Execution steps. + +Execute main workflow + +``` +curl --location 'http://localhost:8080/master' \ +--header 'Content-Type: application/json' \ +--data '{ +}' +``` + +This will return the id and the two properties that are configured by `setup` workflow + +``` +{ + "id": "ad7e1081-3f05-431e-b246-d9471643fec2", + "workflowdata": { + "param1": "This is param1", + "param2": "This is param2" + } +} +``` + +We need to write down the id returned by the previous steps and invoke `workflowA` through a cloud event containing that id as `kogitoprocrefid` attibute. + +``` +curl --location 'http://localhost:8080/executeA' \ +--header 'Content-Type: application/json' \ +--data '{ + "id" : "1", + "specversion" : "1.0", + "type" : "executeA", + "source" : "manual", + "data" : { + "param4" : "Additional parameter" + }, + "kogitoprocrefid" : "ad7e1081-3f05-431e-b246-d9471643fec2" +}' +``` + +The execution of `workflowA` is registered in the quarkus log. + +``` +2024-05-14 12:09:10,306 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'Start' for process 'workflowA' (8321fbd0-64ee-4e95-91d6-957983a92325) +2024-05-14 12:09:10,306 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'doIt' for process 'workflowA' (8321fbd0-64ee-4e95-91d6-957983a92325) +2024-05-14 12:09:10,307 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Property 'workflowdata.param3' changed value from: 'null', to: '"This is workflow A"' +2024-05-14 12:09:10,307 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'End' for process 'workflowA' (8321fbd0-64ee-4e95-91d6-957983a92325) +``` + +The main workflow is still active, waiting for execution of workflow B. Lets execute it sending another cloud event. + +``` +curl --location 'http://localhost:8080/executeB' \ +--header 'Content-Type: application/json' \ +--data '{ + "id": "1", + "specversion": "1.0", + "type": "executeB", + "source": "manual", + "data": { + "param4": "Additional parameter" + }, + "kogitoprocrefid": "ad7e1081-3f05-431e-b246-d9471643fec2" +}' +``` + +We see in quarkus logs that workflow B is executed and that master workflow is completed, since there are not more waiting events + +``` +2024-05-14 12:09:10,334 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'Start' for process 'workflowB' (5a49f40d-2e54-46fb-8317-b0be12fd9f05) +2024-05-14 12:09:10,334 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'doIt' for process 'workflowB' (5a49f40d-2e54-46fb-8317-b0be12fd9f05) +2024-05-14 12:09:10,335 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Property 'workflowdata.param3' changed value from: 'null', to: '"This is workflow B"' +2024-05-14 12:09:10,335 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'End' for process 'workflowB' (5a49f40d-2e54-46fb-8317-b0be12fd9f05) +2024-05-14 12:09:10,335 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Workflow 'workflowB' (5a49f40d-2e54-46fb-8317-b0be12fd9f05) completed +2024-05-14 12:09:10,336 INFO [org.jbp.pro.cor.eve.EventTypeFilter] (kogito-event-executor-1) This event is subscribed to a message ref processInstanceCompleted:5a49f40d-2e54-46fb-8317-b0be12fd9f05 WorkflowProcessInstance [Id=5a49f40d-2e54-46fb-8317-b0be12fd9f05,processId=workflowB,state=2] +2024-05-14 12:09:10,336 INFO [org.jbp.pro.cor.eve.EventTypeFilter] (kogito-event-executor-1) This event is subscribed to a message ref processInstanceCompleted:5a49f40d-2e54-46fb-8317-b0be12fd9f05 WorkflowProcessInstance [Id=5a49f40d-2e54-46fb-8317-b0be12fd9f05,processId=workflowB,state=2] +2024-05-14 12:09:10,339 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'waitForEventsJoin' for process 'master' (0ee42b37-7106-4157-9d75-00842f1fea45) +2024-05-14 12:09:10,339 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'End' for process 'master' (0ee42b37-7106-4157-9d75-00842f1fea45) +2024-05-14 12:09:10,340 INFO [org.kie.kog.ser.wor.dev.DevModeServerlessWorkflowLogger] (kogito-event-executor-1) Triggered node 'End' for process 'master' (0ee42b37-7106-4157-9d75-00842f1fea45) +``` diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/pom.xml b/serverless-workflow-examples/serverless-workflow-subflows-event/pom.xml new file mode 100644 index 0000000000..a2a10a222c --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/pom.xml @@ -0,0 +1,192 @@ + + + + 4.0.0 + + + org.kie.kogito.examples + serverless-workflow-examples-parent + 999-SNAPSHOT + ../serverless-workflow-examples-parent/pom.xml + + + serverless-workflow-subflows-event + 1.0-SNAPSHOT + + Kogito Example :: Serverless Workflow Subflows Event :: Quarkus + Kogito Serverless Workflow Subflows Event - Quarkus + + + 3.8.4 + quarkus-bom + io.quarkus + 3.8.4 + org.kie.kogito + kogito-bom + 999-SNAPSHOT + 999-SNAPSHOT + 17 + 3.8.1 + 3.0.0-M7 + ${version.surefire.plugin} + + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + ${kogito.bom.group-id} + ${kogito.bom.artifact-id} + ${kogito.bom.version} + pom + import + + + + + + org.apache.kie.sonataflow + sonataflow-quarkus + + + io.quarkus + quarkus-resteasy-jackson + + + io.quarkus + quarkus-arc + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-smallrye-openapi + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + + ${project.artifactId} + + + maven-compiler-plugin + ${version.compiler.plugin} + + ${maven.compiler.release} + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus-plugin.version} + + + + build + + + + + + maven-surefire-plugin + ${version.surefire.plugin} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + maven-failsafe-plugin + ${version.failsafe.plugin} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + integration-test + verify + + + + + + + + + container + + + container + + + + container + + + + io.quarkus + quarkus-container-image-jib + + + + + native + + + native + + + + native + + + + diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/application.properties b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/application.properties new file mode 100644 index 0000000000..cd71c3734d --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/application.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +mp.messaging.incoming.executeA.connector=quarkus-http +mp.messaging.incoming.executeA.path=/executeA + +mp.messaging.incoming.executeB.connector=quarkus-http +mp.messaging.incoming.executeB.path=/executeB + +# profile to pack this example into a container, to use it execute activate the maven container profile, -Dcontainer +%container.quarkus.container-image.build=true +%container.quarkus.container-image.push=false +%container.quarkus.container-image.group=${USER} +%container.quarkus.container-image.registry=dev.local +%container.quarkus.container-image.tag=1.0-SNAPSHOT + +quarkus.devservices.enabled=false diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/master.sw.json b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/master.sw.json new file mode 100644 index 0000000000..22a681ab5d --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/master.sw.json @@ -0,0 +1,66 @@ +{ + "id": "master", + "version": "1.0", + "specVersion": "0.8", + "name": "master", + "start": "setup", + "events": [ + { + "name": "executeA", + "source": "", + "type": "executeA" + }, + { + "name": "executeB", + "source": "", + "type": "executeB" + } + ], + "states": [ + { + "name": "setup", + "type" : "operation", + "actions": [{ + "name": "setup", + "subFlowRef" : "setup" + }], + "transition": "waitForEvents" + }, + { + "name": "waitForEvents", + "type": "event", + "onEvents": [ + { + "eventRefs": [ + "executeA" + ], + "actions": [ + { + "name": "workflowA", + "subFlowRef": "workflowA", + "actionDataFilter" : { + "useResults": false + } + } + ] + }, + { + "eventRefs": [ + "executeB" + ], + "actions": [ + { + "name": "workflowB", + "subFlowRef": "workflowB", + "actionDataFilter" : { + "useResults": false + } + } + ] + } + ], + "exclusive": false, + "end" : true + } + ] +} \ No newline at end of file diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/setup.sw.json b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/setup.sw.json new file mode 100644 index 0000000000..7659e1f1bd --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/setup.sw.json @@ -0,0 +1,18 @@ +{ + "id": "setup", + "version": "1.0", + "specVersion": "0.8", + "name": "setup", + "start": "doIt", + "states": [ + { + "name": "doIt", + "type": "inject", + "data" : { + "param1": "This is param1", + "param2": "This is param2" + }, + "end": true + } + ] +} \ No newline at end of file diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/workflowA.sw.json b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/workflowA.sw.json new file mode 100644 index 0000000000..4d626a9375 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/workflowA.sw.json @@ -0,0 +1,17 @@ +{ + "id": "workflowA", + "version": "1.0", + "specVersion": "0.8", + "name": "workflowB", + "start": "doIt", + "states": [ + { + "name": "doIt", + "type": "inject", + "data" : { + "param3": "This is workflow A" + }, + "end": true + } + ] +} \ No newline at end of file diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/workflowB.sw.json b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/workflowB.sw.json new file mode 100644 index 0000000000..43f7c19df4 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/src/main/resources/workflowB.sw.json @@ -0,0 +1,17 @@ +{ + "id": "workflowB", + "version": "1.0", + "specVersion": "0.8", + "name": "workflowB", + "start": "doIt", + "states": [ + { + "name": "doIt", + "type": "inject", + "data" : { + "param3": "This is workflow B" + }, + "end": true + } + ] +} \ No newline at end of file diff --git a/serverless-workflow-examples/serverless-workflow-subflows-event/src/test/java/org/kie/kogito/examples/MasterWorkflowTest.java b/serverless-workflow-examples/serverless-workflow-subflows-event/src/test/java/org/kie/kogito/examples/MasterWorkflowTest.java new file mode 100644 index 0000000000..cdbd05335d --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-subflows-event/src/test/java/org/kie/kogito/examples/MasterWorkflowTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.examples; + +import static io.restassured.RestAssured.given; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; + +import org.junit.jupiter.api.Test; +import org.kie.kogito.event.CloudEventMarshaller; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; +import org.kie.kogito.event.impl.StringCloudEventMarshaller; +import org.kie.kogito.jackson.utils.ObjectMapperFactory; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; + + +@QuarkusTest +class MasterWorkflowTest { + private static final CloudEventMarshaller marshaller = new StringCloudEventMarshaller(ObjectMapperFactory.get()); + + @Test + void testPartialParallelRest() throws IOException { + String id = given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("{}").when() + .post("/master") + .then() + .statusCode(201).extract().path("id"); + sendEvent (id, "executeA"); + sendEvent (id, "executeB"); + waitForFinish("master", id, Duration.ofSeconds(10)); + } + + static void waitForFinish(String flowName, String id, Duration duration) { + await("dead").atMost(duration) + .with().pollInterval(1, SECONDS) + .untilAsserted(() -> given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .get("/" + flowName + "/{id}", id) + .then() + .statusCode(404)); + } + + private void sendEvent(String id, String eventType) throws IOException { + given() + .contentType(ContentType.JSON) + .when() + .body(marshaller.marshall(buildCloudEvent(id, eventType, marshaller))) + .post("/" + eventType) + .then() + .statusCode(202); + } + + static CloudEvent buildCloudEvent(String id, Optional businessKey, String type, CloudEventMarshaller marshaller) { + io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("")) + .withType(type) + .withTime(OffsetDateTime.now()) + .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap("param4", "Additional argument"))); + businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id)); + return builder.build(); + } + + static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { + return buildCloudEvent(id, Optional.empty(), type, marshaller); + } + +}