diff --git a/README.md b/README.md
index 4cc8f4fb1d..0ac3377164 100644
--- a/README.md
+++ b/README.md
@@ -141,6 +141,12 @@ A Serverless Workflow service that works as a Github bot application, which reac
* [on Quarkus](serverless-workflow-examples/serverless-workflow-github-showcase)
+## Serverless Workflow Correlation
+
+* [on Quarkus (JDBC)](serverless-workflow-examples/serverless-workflow-correlation-quarkus)
+* [on Quarkus (MongoDB)](serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus)
+
+
## Other Misc Examples
- Onboarding example combining 1 process and two decision services: see [README.md](kogito-quarkus-examples/onboarding-example/README.md)
diff --git a/serverless-workflow-examples/pom.xml b/serverless-workflow-examples/pom.xml
index 962cbd298a..c9bb9cf6f8 100644
--- a/serverless-workflow-examples/pom.xml
+++ b/serverless-workflow-examples/pom.xml
@@ -49,6 +49,7 @@
serverless-workflow-compensation-quarkus
serverless-workflow-consuming-events-over-http-quarkus
serverless-workflow-correlation-quarkus
+ serverless-workflow-correlation-quarkus-mongodb
serverless-workflow-custom-function-knative
serverless-workflow-custom-type
serverless-workflow-data-index-persistence-addon-quarkus
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/README.md b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/README.md
new file mode 100644
index 0000000000..d901964f75
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/README.md
@@ -0,0 +1,154 @@
+# Kogito Serverless Workflow - Correlation with Callback Example
+
+## Description
+
+This example contains a workflow service to demonstrate correlation feature using callback states and events.
+Each callback state withing the workflow publishes an event and wait for a response event,
+there is an incoming event, it is matched with the proper workflow instance by using the correlation attribute, in this case it is the `userid`. So for every incoming event the userid is used to properly find and trigger the proper workflow instance. The correlation is defined in the [workflow definition file](src/main/resources/correlation.sw.json) that is described using JSON format as defined in the [CNCF Serverless Workflow specification](https://github.com/serverlessworkflow/specification).
+
+```json
+"correlation": [
+ {
+ "contextAttributeName": "userid"
+ }
+]
+```
+Events should be in CloudEvent format and the correlation attribute should be defined as an extension attribute, in this case `userid`.
+
+The workflow example is started by events as well, so a start event should be published with the same correlation attribute `userid, that will be used to match correlations for the started workflow instance.
+
+In the example the event broker used to publish/receive the events is Kafka, and the used topics are the same described as the event types in the workflow definition.
+
+
+```json
+{
+ "name": "newAccountEvent",
+ "source": "",
+ "type": "newAccountEventType",
+ "correlation": [
+ {
+ "contextAttributeName": "userid"
+ }
+ ]
+}
+```
+For simplicity, the events are published and consumed in the same application running the workflow, but in a real use case they should come from different services interacting with the workflow, see [EventsService](src/main/java/org/kie/kogito/examples/EventsService.java).
+
+To start the workflow as mentioned, it is required an event to be published which is going to be consumed by the workflow service starting a new instance. A helper REST endpoint was recreated to simplify this step, so once a POST request is received it publishes the start event to the broker see [WorkflowResource](src/main/java/org/kie/kogito/examples/WorkflowResource.java).
+
+All eventing configuration and the broker parameters are in done in the [application.properties](src/main/resources/application.properties).
+
+## Infrastructure requirements
+
+### Kafka
+
+This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost.
+
+* Install and Startup Kafka Server / Zookeeper
+
+https://kafka.apache.org/quickstart
+
+To publish and consume the event, topic "move" is used.
+
+Optionally and for convenience, a docker-compose [configuration file](docker-compose/docker-compose.yml) is
+provided in the path [docker-compose/](docker-compose/), where you can just run the command from there:
+
+```sh
+docker-compose up
+```
+
+In this way a container for Kafka will be started on port 9092.
+
+### MongoDB
+
+Alternatively, you can run this example using persistence with a MongoDB server.
+
+Configuration for setting up the connection can be found in [applications.properties](src/main/resources/application.properties) file, which
+follows the Quarkus MongoDB Client settings, for more information please check [MongoDB Client Configuration Reference](https://quarkus.io/guides/mongodb#configuration-reference).
+
+Optionally and for convenience, a docker-compose [configuration file](docker-compose/docker-compose.yml) is
+provided in the path [docker-compose/](docker-compose/), where you can just run the command from there:
+
+```sh
+docker-compose up
+```
+
+## Installing and Running
+
+### Prerequisites
+
+You will need:
+ - Java 17+ installed
+ - Environment variable JAVA_HOME set accordingly
+ - Maven 3.9.6+ installed
+
+When using native image compilation, you will also need:
+ - [GraalVm](https://www.graalvm.org/downloads/) 19.3.1+ installed
+ - Environment variable GRAALVM_HOME set accordingly
+ - Note that GraalVM native image compilation typically requires other packages (glibc-devel, zlib-devel and gcc) to be installed too. You also need 'native-image' installed in GraalVM (using 'gu install native-image'). Please refer to [GraalVM installation documentation](https://www.graalvm.org/docs/reference-manual/aot-compilation/#prerequisites) for more details.
+
+### Compile and Run in Local Dev Mode
+
+```sh
+mvn clean package quarkus:dev
+```
+
+### Compile and Run in JVM mode
+
+```sh
+mvn clean package
+java -jar target/quarkus-app/quarkus-run.jar
+```
+
+or on Windows
+
+```sh
+mvn clean package
+java -jar target\quarkus-app\quarkus-run.jar
+```
+
+### Compile and Run in JVM mode using PostgreSQL persistence
+
+To enable persistence, please append `-Ppersistence` to your Maven command.
+That will ensure the correct dependencies are in place, and automatically set the required properties to connect
+with the PostgreSQL instance from the provided docker compose.
+
+```sh
+mvn clean package -Peristence
+```
+
+### Compile and Run using Local Native Image
+Note that this requires GRAALVM_HOME to point to a valid GraalVM installation
+
+```sh
+mvn clean package -Pnative
+```
+
+To run the generated native executable, generated in `target/`, execute
+
+```sh
+./target/serverless-workflow-correlation-quarkus-{version}-runner
+```
+
+### Start a workflow
+
+The service based on the JSON workflow definition can be access by sending a request to http://localhost:8080/account/{userid}
+
+Complete curl command can be found below:
+
+```sh
+curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/12345
+```
+
+After a while (note that to you need give time for event to be consumed) you should see the log message printed in the console, and the workflow is completed.
+
+```text
+2022-05-12 11:02:15,891 INFO [org.kie.kog.ser.eve.imp.ProcessEventDispatcher] (kogito-event-executor-0) Starting new process instance with signal 'newAccountEventType'
+2022-05-12 11:02:18,909 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-9) SRMSG18256: Initialize record store for topic-partition 'validateAccountEmail-0' at position 16.
+2022-05-12 11:02:18,919 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Validate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}}
+2022-05-12 11:02:19,931 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18256: Initialize record store for topic-partition 'validatedAccountEmail-0' at position 16.
+2022-05-12 11:02:20,962 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-8) SRMSG18256: Initialize record store for topic-partition 'activateAccount-0' at position 16.
+2022-05-12 11:02:20,971 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Activate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}}
+2022-05-12 11:02:21,994 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-6) SRMSG18256: Initialize record store for topic-partition 'activatedAccount-0' at position 7.
+2022-05-12 11:02:22,006 INFO [org.kie.kog.exa.EventsService] (kogito-event-executor-0) Complete Account Creation received. Workflow data {"email":"test@test.com","userId":"12345"}, KogitoProcessInstanceId 0cef0eef-06c8-4433-baea-505fa8d45f68
+```
\ No newline at end of file
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/docker-compose/docker-compose.yml b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/docker-compose/docker-compose.yml
new file mode 100644
index 0000000000..24ea2caf84
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/docker-compose/docker-compose.yml
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+version: "3"
+
+services:
+ zookeeper:
+ container_name: zookeeper
+ image: strimzi/kafka:0.20.1-kafka-2.6.0
+ command: [
+ "sh", "-c",
+ "bin/zookeeper-server-start.sh config/zookeeper.properties"
+ ]
+ ports:
+ - "2181:2181"
+ environment:
+ LOG_DIR: "/tmp/logs"
+
+ kafka:
+ image: strimzi/kafka:0.20.1-kafka-2.6.0
+ container_name: kafka
+ command: [
+ "sh", "-c",
+ "bin/kafka-server-start.sh config/server.properties --override inter.broker.listener.name=$${KAFKA_INTER_BROKER_LISTENER_NAME} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+ ]
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 0
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://kafka:9092
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ LOG_DIR: "/tmp/logs"
+
+ mongodb:
+ image: mongo:latest
+ restart: always
+ container_name: mongo
+ ports:
+ - "27017:27017"
+ networks:
+ - mongodb-compose-network
+ mongo-express:
+ image: mongo-express:latest
+ container_name: mongo_express
+ environment:
+ ME_CONFIG_MONGODB_ADMINUSERNAME: root
+ ME_CONFIG_MONGODB_ADMINPASSWORD: example
+ ME_CONFIG_MONGODB_URL: mongodb://mongo:27017/
+ ME_CONFIG_BASICAUTH: false
+ ports:
+ - "8081:8081"
+ depends_on:
+ - mongodb
+ networks:
+ - mongodb-compose-network
+
+networks:
+ mongodb-compose-network:
+ driver: bridge
+
+
+# curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8888/account/mirror && curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/mirror
\ No newline at end of file
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/pom.xml b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/pom.xml
new file mode 100644
index 0000000000..07e78a31dc
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/pom.xml
@@ -0,0 +1,199 @@
+
+
+
+ 4.0.0
+
+
+ org.kie.kogito.examples
+ serverless-workflow-examples-parent
+ 999-SNAPSHOT
+ ../serverless-workflow-examples-parent/pom.xml
+
+
+ org.kie.kogito.examples
+ serverless-workflow-correlation-quarkus-mongodb
+ 1.0-SNAPSHOT
+
+ Kogito Example :: Serverless Workflow Correlation :: Quarkus :: MongoDB
+ Kogito Serverless Workflow Correlation Example - Quarkus - MongoDB
+
+ 3.8.4
+ quarkus-bom
+ io.quarkus
+ 3.8.4
+ org.kie.kogito
+ kogito-bom
+ 999-SNAPSHOT
+ 3.8.1
+ 17
+ 3.0.0-M7
+
+
+
+
+
+ ${quarkus.platform.group-id}
+ ${quarkus.platform.artifact-id}
+ ${quarkus.platform.version}
+ pom
+ import
+
+
+ ${kogito.bom.group-id}
+ ${kogito.bom.artifact-id}
+ ${kogito.bom.version}
+ pom
+ import
+
+
+
+
+
+ org.apache.kie.sonataflow
+ sonataflow-quarkus
+
+
+ org.kie
+ kie-addons-quarkus-messaging
+
+
+ io.quarkus
+ quarkus-smallrye-reactive-messaging-kafka
+
+
+ io.quarkus
+ quarkus-resteasy
+
+
+ io.quarkus
+ quarkus-resteasy-jackson
+
+
+ io.quarkus
+ quarkus-smallrye-health
+
+
+ org.kie
+ kie-addons-quarkus-source-files
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
+ ${project.artifactId}
+
+
+ maven-compiler-plugin
+ ${version.compiler.plugin}
+
+ ${maven.compiler.release}
+
+
+
+ ${quarkus.platform.group-id}
+ quarkus-maven-plugin
+ ${quarkus-plugin.version}
+ true
+
+
+
+ build
+ generate-code
+ generate-code-tests
+
+
+
+
+
+ maven-failsafe-plugin
+ ${version.failsafe.plugin}
+
+
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
+
+ container
+
+
+ container
+
+
+
+ container
+
+
+
+ io.quarkus
+ quarkus-container-image-jib
+
+
+
+
+ persistence
+
+
+ persistence
+
+
+
+
+ org.kie
+ kie-addons-quarkus-persistence-mongodb
+
+
+ io.quarkus
+ quarkus-mongodb-client
+
+
+
+
+
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/Account.java b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/Account.java
new file mode 100644
index 0000000000..5da1f57541
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/Account.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.examples;
+
+public class Account {
+
+ private String email;
+ private String userId;
+
+ public Account() {
+ }
+
+ public Account(String email, String userId) {
+ this.email = email;
+ this.userId = userId;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+}
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/EventsService.java b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/EventsService.java
new file mode 100644
index 0000000000..004e81d958
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/EventsService.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.examples;
+
+import java.net.URI;
+import java.time.OffsetDateTime;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
+import org.eclipse.microprofile.reactive.messaging.Acknowledgment.Strategy;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.kie.kogito.event.cloudevents.utils.CloudEventUtils;
+import org.kie.kogito.internal.process.runtime.KogitoProcessContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.jackson.JsonCloudEventData;
+
+@ApplicationScoped
+public class EventsService {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventsService.class);
+
+ @Inject
+ ObjectMapper objectMapper;
+
+ private Map accounts = new ConcurrentHashMap<>();
+
+ public void complete(JsonNode workflowData, KogitoProcessContext context) {
+ logger.info("Complete Account Creation received. Workflow data {}, KogitoProcessInstanceId {} ", workflowData, context.getProcessInstance().getStringId());
+ }
+
+ @Incoming("validate")
+ @Outgoing("validated")
+ @Acknowledgment(Strategy.POST_PROCESSING)
+ public String onEventValidate(Message message) {
+ Optional ce = CloudEventUtils.decode(message.getPayload());
+ JsonCloudEventData cloudEventData = (JsonCloudEventData) ce.get().getData();
+ logger.info("Validate Account received. Workflow data {}", cloudEventData);
+ String userId = ce.get().getExtension("userid").toString();
+
+ //just for testing
+ accounts.put(userId, ce.get().getExtension("kogitoprocinstanceid").toString());
+
+ return generateCloudEvent(userId, "validatedAccountEmail", null);
+ }
+
+ @Incoming("activate")
+ @Outgoing("activated")
+ @Acknowledgment(Strategy.POST_PROCESSING)
+ public String onEventActivate(Message message) {
+ Optional ce = CloudEventUtils.decode(message.getPayload());
+ JsonCloudEventData cloudEventData = (JsonCloudEventData) ce.get().getData();
+ logger.info("Activate Account received. Workflow data {}", cloudEventData);
+ return generateCloudEvent(ce.get().getExtension("userid").toString(), "activatedAccount", null);
+ }
+
+ private String generateCloudEvent(String id, String type, Object data) {
+ try {
+ return objectMapper.writeValueAsString(CloudEventBuilder.v1()
+ .withId(UUID.randomUUID().toString())
+ .withSource(URI.create(""))
+ .withType(type)
+ .withTime(OffsetDateTime.now())
+ .withExtension("userid", id)
+ .withData(objectMapper.writeValueAsBytes(data))
+ .build());
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public final String getAccount(String userId) {
+ return accounts.get(userId);
+ }
+}
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/WorkflowResource.java b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/WorkflowResource.java
new file mode 100644
index 0000000000..de6af279ab
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/java/org/kie/kogito/examples/WorkflowResource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.examples;
+
+import java.net.URI;
+import java.time.OffsetDateTime;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.POST;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.core.Response;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+/**
+ * Helper class used to facilitate testing using REST
+ */
+@Path("/account")
+public class WorkflowResource {
+
+ @Inject
+ ObjectMapper objectMapper;
+
+ @Channel("start")
+ Emitter emitter;
+
+ @Inject
+ EventsService eventsService;
+
+ @POST
+ @Path("/{userId}")
+ public Response onEvent(@PathParam("userId") String userId) {
+ String start = generateCloudEvent(userId, "newAccountEventType");
+ emitter.send(start);
+ return Response.status(Response.Status.CREATED).build();
+ }
+
+ @GET
+ @Path("/{userId}")
+ public Map getProcessInstanceId(@PathParam("userId") String userId) {
+ return Collections.singletonMap("processInstanceId", eventsService.getAccount(userId));
+ }
+
+ private String generateCloudEvent(String id, String type) {
+ try {
+ return objectMapper.writeValueAsString(CloudEventBuilder.v03()
+ .withId(UUID.randomUUID().toString())
+ .withSource(URI.create(""))
+ .withType(type)
+ .withTime(OffsetDateTime.now())
+ .withExtension("userid", id)
+ .withData(objectMapper.writeValueAsBytes(new Account("test@test.com", id)))
+ .build());
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+}
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/resources/application.properties b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/resources/application.properties
new file mode 100644
index 0000000000..2996230c52
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/resources/application.properties
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+%prod.kafka.bootstrap.servers=localhost:9092
+
+#start the workflow events
+##application channels
+mp.messaging.outgoing.start.connector=smallrye-kafka
+mp.messaging.outgoing.start.topic=newAccountEventType
+mp.messaging.outgoing.start.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.start.group.id=kogito-sw-callback
+
+##workflow channels
+mp.messaging.incoming.newAccountEventType.connector=smallrye-kafka
+mp.messaging.incoming.newAccountEventType.topic=newAccountEventType
+mp.messaging.incoming.newAccountEventType.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+mp.messaging.incoming.newAccountEventType.group.id=kogito-sw-callback
+mp.messaging.incoming.newAccountEventType.auto.offset.reset=earliest
+
+#activate account events
+##application channels
+mp.messaging.outgoing.activated.connector=smallrye-kafka
+mp.messaging.outgoing.activated.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.activated.topic=activatedAccount
+mp.messaging.outgoing.activated.group.id=kogito-sw-callback
+
+mp.messaging.incoming.activate.connector=smallrye-kafka
+mp.messaging.incoming.activate.topic=activateAccount
+mp.messaging.incoming.activate.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+mp.messaging.incoming.activate.group.id=kogito-sw-callback
+mp.messaging.incoming.activate.auto.offset.reset=earliest
+
+##workflow channels
+mp.messaging.incoming.activatedAccount.connector=smallrye-kafka
+mp.messaging.incoming.activatedAccount.topic=activatedAccount
+mp.messaging.incoming.activatedAccount.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+mp.messaging.incoming.activatedAccount.group.id=kogito-sw-callback
+mp.messaging.incoming.activatedAccount.auto.offset.reset=earliest
+
+mp.messaging.outgoing.activateAccount.connector=smallrye-kafka
+mp.messaging.outgoing.activateAccount.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.activateAccount.topic=activateAccount
+mp.messaging.outgoing.activateAccount.group.id=kogito-sw-callback
+
+#validate email events
+##application channels
+mp.messaging.incoming.validate.connector=smallrye-kafka
+mp.messaging.incoming.validate.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+mp.messaging.incoming.validate.topic=validateAccountEmail
+mp.messaging.incoming.validate.group.id=kogito-sw-callback
+mp.messaging.incoming.validate.auto.offset.reset=earliest
+
+mp.messaging.outgoing.validated.connector=smallrye-kafka
+mp.messaging.outgoing.validated.topic=validatedAccountEmail
+mp.messaging.outgoing.validated.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.validated.group.id=kogito-sw-callback
+
+##workflow channels
+mp.messaging.outgoing.validateAccountEmail.connector=smallrye-kafka
+mp.messaging.outgoing.validateAccountEmail.topic=validateAccountEmail
+mp.messaging.outgoing.validateAccountEmail.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.validateAccountEmail.group.id=kogito-sw-callback
+
+mp.messaging.incoming.validatedAccountEmail.connector=smallrye-kafka
+mp.messaging.incoming.validatedAccountEmail.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+mp.messaging.incoming.validatedAccountEmail.topic=validatedAccountEmail
+mp.messaging.incoming.validatedAccountEmail.group.id=kogito-sw-callback
+mp.messaging.incoming.validatedAccountEmail.auto.offset.reset=earliest
+
+#Persistence configuration
+kogito.persistence.type=mongodb
+
+quarkus.mongodb.database=kogito
+%prod.quarkus.mongodb.database=kogito
+
+kogito.persistence.proto.marshaller=true
+
+quarkus.grpc.dev-mode.force-server-start=false
+
+# profile to pack this example into a container, to use it execute activate the maven container profile, -Dcontainer
+%container.quarkus.container-image.build=true
+%container.quarkus.container-image.push=false
+%container.quarkus.container-image.group=${USER}
+%container.quarkus.container-image.registry=dev.local
+%container.quarkus.container-image.tag=1.0-SNAPSHOT
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/resources/correlation.sw.json b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/resources/correlation.sw.json
new file mode 100644
index 0000000000..b6a44aeb13
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/main/resources/correlation.sw.json
@@ -0,0 +1,104 @@
+{
+ "id": "correlation",
+ "version": "1.0",
+ "name": "Workflow Correlation example",
+ "description": "An example of how to use correlation on events",
+ "start": "New User Account Request",
+ "events": [
+ {
+ "name": "newAccountEvent",
+ "source": "",
+ "type": "newAccountEventType",
+ "correlation": [
+ {
+ "contextAttributeName": "userid"
+ }
+ ]
+ },
+ {
+ "name": "validateAccountEmailEvent",
+ "source": "workflow",
+ "type": "validateAccountEmail"
+ },
+ {
+ "name": "validatedAccountEmailEvent",
+ "source": "workflow",
+ "type": "validatedAccountEmail",
+ "correlation": [
+ {
+ "contextAttributeName": "userid"
+ }
+ ]
+ },
+ {
+ "name": "activateAccountEvent",
+ "source": "workflow",
+ "type": "activateAccount"
+ },
+ {
+ "name": "activatedAccountEvent",
+ "source": "workflow",
+ "type": "activatedAccount",
+ "correlation": [
+ {
+ "contextAttributeName": "userid"
+ }
+ ]
+ }
+ ],
+ "functions": [
+ {
+ "name": "complete",
+ "type": "custom",
+ "operation": "service:java:org.kie.kogito.examples.EventsService::complete"
+ }
+ ],
+ "states": [
+ {
+ "name":"New User Account Request",
+ "type":"event",
+ "onEvents": [{
+ "eventRefs": ["newAccountEvent"]
+ }],
+ "transition": "Validate User Email"
+ },
+ {
+ "name": "Validate User Email",
+ "type": "callback",
+ "action": {
+ "name": "publish validate event",
+ "eventRef": {
+ "triggerEventRef": "validateAccountEmailEvent"
+ }
+ },
+ "eventRef": "validatedAccountEmailEvent",
+ "transition": "Activate User Account"
+ },
+{
+ "name": "Activate User Account",
+ "type": "callback",
+ "action": {
+ "name": "publish Activate Account event",
+ "eventRef": {
+ "triggerEventRef": "activateAccountEvent"
+ }
+ },
+ "eventRef": "activatedAccountEvent",
+ "transition": "Account Creation Completed"
+ },
+
+ {
+ "name": "Account Creation Completed",
+ "type": "operation",
+ "actions": [
+ {
+ "name": "accountCreationCompleted",
+ "functionRef": {
+ "refName": "complete"
+ }
+ }
+ ],
+ "end": true
+ }
+ ]
+}
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/test/java/org/kie/kogito/examples/CorrelationIT.java b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/test/java/org/kie/kogito/examples/CorrelationIT.java
new file mode 100644
index 0000000000..9f8a24fcba
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/test/java/org/kie/kogito/examples/CorrelationIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.examples;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.restassured.http.ContentType;
+
+import static io.restassured.RestAssured.given;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.notNullValue;
+
+@QuarkusIntegrationTest
+class CorrelationIT {
+
+ public static final String HEALTH_URL = "/q/health";
+ public static final int TIMEOUT = 2;
+
+ private String userId = "12345";
+
+ @Test
+ void testCorrelation() {
+ //health check - wait to be ready
+ await()
+ .atMost(TIMEOUT, MINUTES)
+ .pollDelay(2, SECONDS)
+ .pollInterval(1, SECONDS)
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .get(HEALTH_URL)
+ .then()
+ .statusCode(200));
+
+ //start workflow
+ given()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .pathParam("userId", userId)
+ .post("/account/{userId}")
+ .then()
+ .statusCode(201);
+
+ //check instance created
+ AtomicReference processInstanceId = new AtomicReference<>();
+ await().atMost(TIMEOUT, MINUTES)
+ .pollInterval(1, SECONDS)
+ .untilAsserted(() -> processInstanceId.set(given()
+ .accept(ContentType.JSON)
+ .pathParam("userId", userId)
+ .get("/account/{userId}")
+ .then()
+ .statusCode(200)
+ .body("processInstanceId", notNullValue())
+ .extract()
+ .body().path("processInstanceId")));
+
+ //check instance completed
+ await()
+ .atMost(TIMEOUT, MINUTES)
+ .pollInterval(1, SECONDS)
+ .untilAsserted(() -> given()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .pathParam("processInstanceId", processInstanceId.get())
+ .get("/correlation/{processInstanceId}")
+ .then()
+ .statusCode(404));
+ }
+}
diff --git a/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/test/resources/application.properties b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/test/resources/application.properties
new file mode 100644
index 0000000000..50bf6c1154
--- /dev/null
+++ b/serverless-workflow-examples/serverless-workflow-correlation-quarkus-mongodb/src/test/resources/application.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Quarkus
+quarkus.http.test-port=0
+
+# Temporary fix for test to pass due to issue in Quarkus classloading resolver
+quarkus.class-loading.parent-first-artifacts=org.testcontainers:testcontainers
\ No newline at end of file