Skip to content

Commit

Permalink
Add mongodb correlation for Quarkus example
Browse files Browse the repository at this point in the history
  • Loading branch information
mcruzdev committed Aug 20, 2024
1 parent 00da2e0 commit e8b21d7
Show file tree
Hide file tree
Showing 11 changed files with 798 additions and 0 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ A Serverless Workflow service that works as a Github bot application, which reac

* [on Quarkus](serverless-workflow-examples/serverless-workflow-github-showcase)

## Serverless Workflow Correlation

* [on Quarkus (JDBC)](serverless-workflow-examples/serverless-workflow-correlation-quarkus)
* [on Quarkus (MongoDB)](serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus)


## Other Misc Examples

- Onboarding example combining 1 process and two decision services: see [README.md](kogito-quarkus-examples/onboarding-example/README.md)
Expand Down
1 change: 1 addition & 0 deletions serverless-workflow-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<module>serverless-workflow-compensation-quarkus</module>
<module>serverless-workflow-consuming-events-over-http-quarkus</module>
<module>serverless-workflow-correlation-quarkus</module>
<module>serverless-workflow-correlation-mongodb-quarkus</module>
<module>serverless-workflow-custom-function-knative</module>
<module>serverless-workflow-custom-type</module>
<module>serverless-workflow-data-index-persistence-addon-quarkus</module>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Kogito Serverless Workflow - Correlation with Callback Example

## Description

This example contains a workflow service to demonstrate correlation feature using callback states and events.
Each callback state withing the workflow publishes an event and wait for a response event,
there is an incoming event, it is matched with the proper workflow instance by using the correlation attribute, in this case it is the `userid`. So for every incoming event the userid is used to properly find and trigger the proper workflow instance. The correlation is defined in the [workflow definition file](src/main/resources/correlation.sw.json) that is described using JSON format as defined in the [CNCF Serverless Workflow specification](https://github.com/serverlessworkflow/specification).

```json
"correlation": [
{
"contextAttributeName": "userid"
}
]
```
Events should be in CloudEvent format and the correlation attribute should be defined as an extension attribute, in this case `userid`.

The workflow example is started by events as well, so a start event should be published with the same correlation attribute `userid, that will be used to match correlations for the started workflow instance.

In the example the event broker used to publish/receive the events is Kafka, and the used topics are the same described as the event types in the workflow definition.


```json
{
"name": "newAccountEvent",
"source": "",
"type": "newAccountEventType",
"correlation": [
{
"contextAttributeName": "userid"
}
]
}
```
For simplicity, the events are published and consumed in the same application running the workflow, but in a real use case they should come from different services interacting with the workflow, see [EventsService](src/main/java/org/kie/kogito/examples/EventsService.java).

To start the workflow as mentioned, it is required an event to be published which is going to be consumed by the workflow service starting a new instance. A helper REST endpoint was recreated to simplify this step, so once a POST request is received it publishes the start event to the broker see [WorkflowResource](src/main/java/org/kie/kogito/examples/WorkflowResource.java).

All eventing configuration and the broker parameters are in done in the [application.properties](src/main/resources/application.properties).

## Infrastructure requirements

### Kafka

This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost.

* Install and Startup Kafka Server / Zookeeper

https://kafka.apache.org/quickstart

To publish and consume the event, topic "move" is used.

Optionally and for convenience, a docker-compose [configuration file](docker-compose/docker-compose.yml) is
provided in the path [docker-compose/](docker-compose/), where you can just run the command from there:

```sh
docker-compose up
```

In this way a container for Kafka will be started on port 9092.

### MongoDB

Alternatively, you can run this example using persistence with a MongoDB server.

Configuration for setting up the connection can be found in [applications.properties](src/main/resources/application.properties) file, which
follows the Quarkus MongoDB Client settings, for more information please check [MongoDB Client Configuration Reference](https://quarkus.io/guides/mongodb#configuration-reference).

Optionally and for convenience, a docker-compose [configuration file](docker-compose/docker-compose.yml) is
provided in the path [docker-compose/](docker-compose/), where you can just run the command from there:

```sh
docker-compose up
```

## Installing and Running

### Prerequisites

You will need:
- Java 17+ installed
- Environment variable JAVA_HOME set accordingly
- Maven 3.9.6+ installed

When using native image compilation, you will also need:
- [GraalVm](https://www.graalvm.org/downloads/) 19.3.1+ installed
- Environment variable GRAALVM_HOME set accordingly
- Note that GraalVM native image compilation typically requires other packages (glibc-devel, zlib-devel and gcc) to be installed too. You also need 'native-image' installed in GraalVM (using 'gu install native-image'). Please refer to [GraalVM installation documentation](https://www.graalvm.org/docs/reference-manual/aot-compilation/#prerequisites) for more details.

### Compile and Run in Local Dev Mode

```sh
mvn clean package quarkus:dev
```

### Compile and Run in JVM mode

```sh
mvn clean package
java -jar target/quarkus-app/quarkus-run.jar
```

or on Windows

```sh
mvn clean package
java -jar target\quarkus-app\quarkus-run.jar
```

### Compile and Run in JVM mode using PostgreSQL persistence

To enable persistence, please append `-Ppersistence` to your Maven command.
That will ensure the correct dependencies are in place, and automatically set the required properties to connect
with the PostgreSQL instance from the provided docker compose.

```sh
mvn clean package -Peristence
```

### Compile and Run using Local Native Image
Note that this requires GRAALVM_HOME to point to a valid GraalVM installation

```sh
mvn clean package -Pnative
```

To run the generated native executable, generated in `target/`, execute

```sh
./target/serverless-workflow-correlation-quarkus-{version}-runner
```

### Start a workflow

The service based on the JSON workflow definition can be access by sending a request to http://localhost:8080/account/{userid}

Complete curl command can be found below:

```sh
curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/12345
```

After a while (note that to you need give time for event to be consumed) you should see the log message printed in the console, and the workflow is completed.

```text
2022-05-12 11:02:15,891 INFO [org.kie.kog.ser.eve.imp.ProcessEventDispatcher] (kogito-event-executor-0) Starting new process instance with signal 'newAccountEventType'
2022-05-12 11:02:18,909 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-9) SRMSG18256: Initialize record store for topic-partition 'validateAccountEmail-0' at position 16.
2022-05-12 11:02:18,919 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Validate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}}
2022-05-12 11:02:19,931 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18256: Initialize record store for topic-partition 'validatedAccountEmail-0' at position 16.
2022-05-12 11:02:20,962 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-8) SRMSG18256: Initialize record store for topic-partition 'activateAccount-0' at position 16.
2022-05-12 11:02:20,971 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Activate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}}
2022-05-12 11:02:21,994 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-6) SRMSG18256: Initialize record store for topic-partition 'activatedAccount-0' at position 7.
2022-05-12 11:02:22,006 INFO [org.kie.kog.exa.EventsService] (kogito-event-executor-0) Complete Account Creation received. Workflow data {"email":"test@test.com","userId":"12345"}, KogitoProcessInstanceId 0cef0eef-06c8-4433-baea-505fa8d45f68
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

version: "3"

services:
zookeeper:
container_name: zookeeper
image: strimzi/kafka:0.20.1-kafka-2.6.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: "/tmp/logs"

kafka:
image: strimzi/kafka:0.20.1-kafka-2.6.0
container_name: kafka
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override inter.broker.listener.name=$${KAFKA_INTER_BROKER_LISTENER_NAME} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://kafka:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
LOG_DIR: "/tmp/logs"

mongodb:
image: mongo:latest
restart: always
container_name: mongo
ports:
- "27017:27017"
networks:
- mongodb-compose-network
mongo-express:
image: mongo-express:latest
container_name: mongo_express
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
ME_CONFIG_MONGODB_URL: mongodb://mongo:27017/
ME_CONFIG_BASICAUTH: false
ports:
- "8081:8081"
depends_on:
- mongodb
networks:
- mongodb-compose-network

networks:
mongodb-compose-network:
driver: bridge


# curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8888/account/mirror && curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/mirror
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.examples;

public class Account {

private String email;
private String userId;

public Account() {
}

public Account(String email, String userId) {
this.email = email;
this.userId = userId;
}

public String getEmail() {
return email;
}

public String getUserId() {
return userId;
}
}
Loading

0 comments on commit e8b21d7

Please sign in to comment.