From d167ae17195a68776c858e2b77aec26a583dad6b Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> Date: Tue, 16 Jul 2024 20:16:58 +0200 Subject: [PATCH] [Fix #3571] Adding onErrors support for starting event state (#3578) * [Fix #3571] Adding onErrors support for starting event state * [Fix #3571] Adding IT test --- .../parser/handlers/EventHandler.java | 4 +- .../src/main/resources/application.properties | 4 + .../resources/eventStartWithError.sw.json | 76 +++++++++++++++++++ .../main/resources/specs/callbackResults.yaml | 7 ++ .../kogito/quarkus/workflows/EventFlowIT.java | 6 +- .../quarkus/workflows/EventTimedoutIT.java | 36 +++++++++ 6 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java index 6c95879a018..7aa2b2eaf55 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java @@ -59,7 +59,6 @@ public MakeNodeResult makeNode(RuleFlowNodeContainerFactory factory) { handleErrors(factory, embeddedContainer); return new MakeNodeResult(embeddedContainer); } - } private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory factory, OnEvents onEvent) { @@ -67,6 +66,9 @@ private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory factory onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), getVarName(), (f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar))); CompositeContextNodeFactory embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions()); + if (isStartState) { + handleErrors(factory, embeddedSubProcess); + } connect(result.getOutgoingNode(), embeddedSubProcess); return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess); } diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties index 8b83e2da2f9..cb8cbdd9adf 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties @@ -51,6 +51,10 @@ quarkus.rest-client.enum-parameter_yaml.url=${enum-echo-service-mock.url} # Error handling properties kogito.sw.functions.publishPerfectSquare.host=localhost + +mp.messaging.incoming.start.connector=quarkus-http +mp.messaging.incoming.start.path=/startWithError + mp.messaging.incoming.move.connector=quarkus-http mp.messaging.incoming.move.path=/move diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json new file mode 100644 index 00000000000..e1e87032bee --- /dev/null +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json @@ -0,0 +1,76 @@ +{ + "id": "startEventError", + "version": "1.0", + "name": "Workflow event test", + "description": "An test of a starting event with error on action", + "start": "waitForEvent", + "events": [ + { + "name": "startEvent", + "source": "", + "type": "start" + } + ], + "errors": [ + { + "name": "odd number", + "code": "Odd situation" + } + ], + "functions": [ + { + "name": "publishEvenError", + "type": "asyncapi", + "operation": "specs/callbackResults.yaml#sendEvenError" + }, + { + "name": "isEven", + "type": "custom", + "operation": "service:java:org.kie.kogito.workflows.services.EvenService::isEven" + } + ] + , + "states": [ + { + "name": "waitForEvent", + "type": "event", + "onEvents": [ + { + "eventRefs": [ + "startEvent" + ], + "actions": [ + { + "name": "actionWithError", + "functionRef": { + "refName": "isEven", + "arguments": { + "number": ".number" + } + } + } + ] + + } + ], + "onErrors": [ + { + "errorRef": "odd number", + "transition": "PublishError" + } + ], + "end":true + }, + { + "name": "PublishError", + "type": "operation", + "actions": [ + { + "name": "publishEvenError", + "functionRef": "publishEvenError" + } + ], + "end": "true" + } + ] +} \ No newline at end of file diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml index be2797bae7c..7b7b97d2776 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml @@ -54,6 +54,13 @@ channels: summary: Timeout Expired message: $ref: '#/components/messages/message' + sendEvenError: + description: A message channel for publishing errors + publish: + operationId: sendEvenError + summary: Reporting error + message: + $ref: '#/components/messages/message' error: description: A message channel for failed executions publish: diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java index bd96c501170..dceacae493b 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java @@ -62,16 +62,16 @@ static void init() { } @Test - void testStartingEventWithToStateFilter() { + void testStartingEventWithToStateFilter() throws IOException { given() .contentType(ContentType.JSON) .when() - .body(CloudEventBuilder.v1() + .body(defaultMarshaller.marshall(CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("customer-arrival-event-source")) .withType("customer-arrival-type") .withTime(OffsetDateTime.now()) - .withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build()) + .withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build())) .post("/eventWithToStateFilter") .then() .statusCode(202); diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java index 544b016549d..4121ca04805 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java @@ -19,7 +19,11 @@ package org.kie.kogito.quarkus.workflows; import java.io.IOException; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Collections; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,8 +32,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.kie.kogito.event.CloudEventMarshaller; import org.kie.kogito.event.Converter; import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; +import org.kie.kogito.event.impl.ByteArrayCloudEventMarshaller; import org.kie.kogito.event.impl.ByteArrayCloudEventUnmarshallerFactory; import org.kie.kogito.test.quarkus.QuarkusTestProperty; import org.kie.kogito.test.quarkus.kafka.KafkaTypedTestClient; @@ -41,10 +47,14 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.jackson.JsonFormat; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.restassured.RestAssured; +import io.restassured.http.ContentType; +import static io.restassured.RestAssured.given; import static org.assertj.core.api.Assertions.assertThat; import static org.kie.kogito.quarkus.workflows.AssuredTestUtils.startProcess; @@ -59,13 +69,17 @@ public class EventTimedoutIT { private ObjectMapper objectMapper; private KafkaTypedTestClient kafkaClient; + private static CloudEventMarshaller defaultMarshaller; + @BeforeEach void setup() { + RestAssured.enableLoggingOfRequestAndResponseIfValidationFails(); kafkaClient = new KafkaTypedTestClient<>(kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class); objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .registerModule(JsonFormat.getCloudEventJacksonModule()) .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + defaultMarshaller = new ByteArrayCloudEventMarshaller(objectMapper); } @AfterEach @@ -93,4 +107,26 @@ void testTimedout() throws InterruptedException { countDownLatch.await(10, TimeUnit.SECONDS); assertThat(countDownLatch.getCount()).isZero(); } + + @Test + void testStartEventWithError() throws InterruptedException, IOException { + given() + .contentType(ContentType.JSON) + .when() + .body(defaultMarshaller.marshall(CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("source")) + .withType("start") + .withTime(OffsetDateTime.now()) + .withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("number", 3))).build())) + .post("/startWithError") + .then() + .statusCode(202); + final CountDownLatch countDownLatch = new CountDownLatch(1); + kafkaClient.consume("sendEvenError", v -> { + countDownLatch.countDown(); + }); + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(countDownLatch.getCount()).isZero(); + } }