Skip to content

Commit

Permalink
KOGITO-9849 DataIndex is not processing well the http cloud events
Browse files Browse the repository at this point in the history
    - Ensures that the kogito-addons-quarkus-events-process addon produces the proper content type header when http cloud events are used.
    - Ensure the data index dev services produce the proper content type header.
  • Loading branch information
wmedvede committed Oct 5, 2023
1 parent 3e3de7c commit 5059197
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 26 deletions.
21 changes: 21 additions & 0 deletions quarkus/addons/common/reactive-messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
</dependency>
<!-- These dependencies are needed in case Smallrye is configured with HTTP connector -->
<dependency>
<groupId>io.quarkiverse.reactivemessaging.http</groupId>
<artifactId>quarkus-reactive-messaging-http</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.addon.quarkus.messaging.common.message;
package org.kie.kogito.addon.quarkus.common.reactive.messaging.http;

import javax.ws.rs.core.HttpHeaders;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.addon.quarkus.messaging.common.message;
package org.kie.kogito.addon.quarkus.messaging.common.message.http;

import java.util.Collections;
import java.util.Optional;
Expand All @@ -26,9 +26,8 @@

import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Test;
import org.kie.kogito.KogitoGAV;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.http.CloudEventHttpOutgoingDecorator;

import io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata;
import io.quarkus.test.junit.QuarkusTest;
Expand All @@ -44,25 +43,6 @@ class CloudEventHttpOutgoingDecoratorTest {
@Produces
CloudEventHttpOutgoingDecorator decorator = new CloudEventHttpOutgoingDecorator();

@Produces
ConfigBean configBean = new ConfigBean() {

@Override
public boolean useCloudEvents() {
return true;
}

@Override
public String getServiceUrl() {
return null;
}

@Override
public Optional<KogitoGAV> getGav() {
return Optional.empty();
}
};

@Test
void verifyOutgoingHttpMetadataIsSet() {
Message<String> message = provider.decorate(Message.of("pepe"));
Expand Down
4 changes: 4 additions & 0 deletions quarkus/addons/events/process/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-quarkus-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-quarkus-common-deployment</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
*/
package org.kie.kogito.events.process.deployment;

import org.kie.kogito.addon.quarkus.common.reactive.messaging.http.CloudEventHttpOutgoingDecorator;
import org.kie.kogito.quarkus.addons.common.deployment.KogitoCapability;
import org.kie.kogito.quarkus.addons.common.deployment.OneOfCapabilityKogitoAddOnProcessor;
import org.kie.kogito.quarkus.common.deployment.KogitoBuildContextBuildItem;
import org.kie.kogito.quarkus.config.KogitoBuildTimeConfig;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;

Expand All @@ -37,4 +43,11 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
void httpMessageDecorator(BuildProducer<AdditionalBeanBuildItem> beanBuildItem, KogitoBuildTimeConfig buildTimeConfig, KogitoBuildContextBuildItem kogitoContext) {
if (buildTimeConfig.useCloudEvents && kogitoContext.getKogitoBuildContext().hasClassAvailable("io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata")) {
beanBuildItem.produce(AdditionalBeanBuildItem.builder().addBeanClass(CloudEventHttpOutgoingDecorator.class).setDefaultScope(DotNames.APPLICATION_SCOPED).build());
}
}

}
4 changes: 4 additions & 0 deletions quarkus/addons/events/process/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-quarkus-common</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.addon.quarkus.messaging.common.message.CloudEventHttpOutgoingDecorator;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.http.CloudEventHttpOutgoingDecorator;
import org.kie.kogito.jobs.messaging.quarkus.AbstractReactiveMessagingJobsService;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.kie.kogito.addon.quarkus.messaging.common.message.CloudEventHttpOutgoingDecorator.CLOUD_EVENTS_CONTENT_TYPE;
import static org.kie.kogito.addon.quarkus.common.reactive.messaging.http.CloudEventHttpOutgoingDecorator.CLOUD_EVENTS_CONTENT_TYPE;

class KnativeEventingJobsServiceTest extends AbstractReactiveMessagingJobsServiceTest<KnativeEventingJobsService> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.drools.codegen.common.GeneratedFileType;
import org.jboss.jandex.DotName;
import org.jbpm.compiler.canonical.ProcessMetaData;
import org.kie.kogito.addon.quarkus.messaging.common.message.CloudEventHttpOutgoingDecorator;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.http.CloudEventHttpOutgoingDecorator;
import org.kie.kogito.codegen.api.context.KogitoBuildContext;
import org.kie.kogito.codegen.process.ProcessGenerator;
import org.kie.kogito.quarkus.addons.common.deployment.AnyEngineKogitoAddOnProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class DataIndexEventPublisher implements EventPublisher {

public static final String KOGITO_DATA_INDEX = "kogito.data-index.url";
private static final Logger LOGGER = LoggerFactory.getLogger(DataIndexEventPublisher.class);
private static final String CLOUD_EVENTS_CONTENT_TYPE = "application/cloudevents+json";
private static final String CONTENT_TYPE = "content-type";

@ConfigProperty(name = KOGITO_DATA_INDEX)
Optional<String> dataIndexUrl;
Expand All @@ -62,6 +64,7 @@ public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
webClient.postAbs(dataIndexUrl.get() + "/processes")
.putHeader(CONTENT_TYPE, CLOUD_EVENTS_CONTENT_TYPE)
.expect(ResponsePredicate.SC_ACCEPTED)
.sendJson(event, result -> {
if (result.failed()) {
Expand All @@ -73,6 +76,7 @@ public void publish(DataEvent<?> event) {
break;
case "UserTaskInstanceEvent":
webClient.postAbs(dataIndexUrl.get() + "/tasks")
.putHeader(CONTENT_TYPE, CLOUD_EVENTS_CONTENT_TYPE)
.expect(ResponsePredicate.SC_ACCEPTED)
.sendJson(event, result -> {
if (result.failed()) {
Expand Down

0 comments on commit 5059197

Please sign in to comment.