Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Grouping of event serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Sep 26, 2024
1 parent 7d99fb4 commit 6b58fcd
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ public abstract class AbstractDataEvent<T> implements DataEvent<T> {
protected AbstractDataEvent() {
}

protected AbstractDataEvent(String type, URI source, T body) {
this.specVersion = SpecVersion.parse(SPEC_VERSION);
this.id = UUID.randomUUID().toString();
this.source = source;
this.type = type;
this.time = ZonedDateTime.now().toOffsetDateTime();
this.data = body;
}

protected AbstractDataEvent(String type,
String source,
T body,
Expand All @@ -201,12 +210,7 @@ protected AbstractDataEvent(String type,
String subject,
String dataContentType,
String dataSchema) {
this.specVersion = SpecVersion.parse(SPEC_VERSION);
this.id = UUID.randomUUID().toString();
this.source = Optional.ofNullable(source).map(URI::create).orElse(null);
this.type = type;
this.time = ZonedDateTime.now().toOffsetDateTime();
this.data = body;
this(type, Optional.ofNullable(source).map(URI::create).orElse(null), body);
setKogitoProcessInstanceId(kogitoProcessInstanceId);
setKogitoRootProcessInstanceId(kogitoRootProcessInstanceId);
setKogitoProcessId(kogitoProcessId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import java.net.URI;
import java.util.Collection;

public class MultipleProcessInstanceDataEvent extends ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {

public MultipleProcessInstanceDataEvent(URI source, Collection<ProcessInstanceDataEvent<?>> body) {
super("MultipleProcessInstanceDataEvent", source, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.kie.kogito.event.process;

import java.net.URI;

import org.kie.kogito.event.AbstractDataEvent;

public class ProcessInstanceDataEvent<T> extends AbstractDataEvent<T> {
Expand All @@ -29,6 +31,10 @@ public ProcessInstanceDataEvent(T body) {
setData(body);
}

protected ProcessInstanceDataEvent(String type, URI source, T body) {
super(type, source, body);
}

public ProcessInstanceDataEvent(String type,
String source,
T body,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.usertask;

import java.net.URI;
import java.util.Collection;

public class MultipleUserTaskInstanceDataEvent extends UserTaskInstanceDataEvent<Collection<UserTaskInstanceDataEvent<?>>> {

public MultipleUserTaskInstanceDataEvent(URI source, Collection<UserTaskInstanceDataEvent<?>> body) {
super("MultipleUserTaskInstanceDataEvent", source, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.event.usertask;

import java.net.URI;
import java.util.Set;

import org.kie.kogito.event.AbstractDataEvent;
Expand Down Expand Up @@ -48,6 +49,10 @@ public UserTaskInstanceDataEvent(T body) {
setData(body);
}

protected UserTaskInstanceDataEvent(String type, URI source, T body) {
super(type, source, body);
}

public UserTaskInstanceDataEvent(String type,
String source,
T body,
Expand Down
2 changes: 1 addition & 1 deletion kogito-build/kogito-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<version.io.quarkiverse.jackson-jq>2.0.2</version.io.quarkiverse.jackson-jq>
<version.io.quarkiverse.openapi.generator>2.4.1</version.io.quarkiverse.openapi.generator>
<version.io.quarkiverse.asyncapi>0.3.0</version.io.quarkiverse.asyncapi>
<version.io.quarkiverse.reactivemessaging.http>2.2.0</version.io.quarkiverse.reactivemessaging.http>
<version.io.quarkiverse.reactivemessaging.http>2.3.1</version.io.quarkiverse.reactivemessaging.http>
<version.io.quarkiverse.embedded.postgresql>0.2.3</version.io.quarkiverse.embedded.postgresql>
<version.com.github.haifengl.smile>1.5.2</version.com.github.haifengl.smile>
<version.com.github.javaparser>3.25.8</version.com.github.javaparser>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public void init() {
}

protected Optional<AbstractMessageEmitter> getConsumer(DataEvent<?> event) {
if (event == null) {
return Optional.empty();
}
switch (event.getType()) {
case "ProcessDefinitionEvent":
return eventsRuntimeConfig.isProcessDefinitionEventsEnabled() ? Optional.of(processDefinitionConsumer) : Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.events.process;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -26,6 +27,10 @@
import java.util.Map.Entry;

import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;

import io.quarkus.arc.properties.IfBuildProperty;

Expand All @@ -40,18 +45,28 @@ public void publish(DataEvent<?> event) {
publish(Collections.singletonList(event));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void publish(Collection<DataEvent<?>> events) {
Map<AbstractMessageEmitter, Collection<DataEvent<?>>> eventsByChannel = new HashMap<>();
Map<AbstractMessageEmitter, Collection> eventsByChannel = new HashMap<>();
for (DataEvent<?> event : events) {
if (event == null) {
continue;
}
getConsumer(event).ifPresent(c -> eventsByChannel.computeIfAbsent(c, k -> new ArrayList<>()).add(event));
}
for (Entry<AbstractMessageEmitter, Collection<DataEvent<?>>> item : eventsByChannel.entrySet()) {
publishToTopic(item.getKey(), item.getValue());
}
eventsByChannel.entrySet().forEach(this::publishEvents);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void publishEvents(Map.Entry<AbstractMessageEmitter, Collection> entry) {
DataEvent<?> firstEvent = (DataEvent<?>) entry.getValue().iterator().next();
URI source = firstEvent.getSource();
if (firstEvent instanceof UserTaskInstanceDataEvent) {
publishToTopic(entry.getKey(), new MultipleUserTaskInstanceDataEvent(source, (Collection<UserTaskInstanceDataEvent<?>>) entry.getValue()));
} else if (firstEvent instanceof ProcessInstanceDataEvent) {
publishToTopic(entry.getKey(), new MultipleProcessInstanceDataEvent(source, (Collection<ProcessInstanceDataEvent<?>>) entry.getValue()));
} else {
for (DataEvent<?> event : (Collection<DataEvent<?>>) entry.getValue()) {
publishToTopic(entry.getKey(), event);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.events.config.EventsRuntimeConfig;
import org.kie.kogito.events.process.AbstractMessagingEventPublisher.AbstractMessageEmitter;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -149,10 +153,10 @@ public void testReactiveMessagingEventPublisher_publish() throws Exception {
@Test
public void testPublishGroupingByChannel() {
// Create mock events
DataEvent<String> processInstanceEvent = mock(DataEvent.class);
DataEvent<String> processInstanceEvent = mock(ProcessInstanceDataEvent.class);
when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");

DataEvent<String> userTaskEvent = mock(DataEvent.class);
DataEvent<String> userTaskEvent = mock(UserTaskInstanceDataEvent.class);
when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent");

// Mock getConsumer() to return different emitters based on event type
Expand All @@ -169,17 +173,17 @@ public void testPublishGroupingByChannel() {
groupingMessagingEventPublisher.publish(events);

// Capture and verify that the correct emitter was used for each event
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection());
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), anyCollection());
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class));
}

@Test
public void testPublishMultipleEventsGroupedByChannel() {
// Create multiple events of different types
DataEvent<String> processInstanceEvent1 = mock(DataEvent.class);
DataEvent<String> processInstanceEvent2 = mock(DataEvent.class);
DataEvent<String> userTaskEvent1 = mock(DataEvent.class);
DataEvent<String> userTaskEvent2 = mock(DataEvent.class);
DataEvent<String> processInstanceEvent1 = mock(ProcessInstanceDataEvent.class);
DataEvent<String> processInstanceEvent2 = mock(ProcessInstanceDataEvent.class);
DataEvent<String> userTaskEvent1 = mock(UserTaskInstanceDataEvent.class);
DataEvent<String> userTaskEvent2 = mock(UserTaskInstanceDataEvent.class);

when(processInstanceEvent1.getType()).thenReturn("ProcessInstanceStateDataEvent");
when(processInstanceEvent2.getType()).thenReturn("ProcessInstanceStateDataEvent");
Expand All @@ -202,19 +206,21 @@ public void testPublishMultipleEventsGroupedByChannel() {
groupingMessagingEventPublisher.publish(events);

// Verify that two grouped publishToTopic calls are made: one for processInstanceConsumer, one for userTaskConsumer
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection());
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), anyCollection());
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class));

// Verify that the right number of events was grouped and passed to each emitter
ArgumentCaptor<Collection<DataEvent<?>>> captor = ArgumentCaptor.forClass(Collection.class);
ArgumentCaptor<MultipleProcessInstanceDataEvent> captorPI = ArgumentCaptor.forClass(MultipleProcessInstanceDataEvent.class);

verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), captor.capture());
Collection<DataEvent<?>> groupedProcessInstanceEvents = captor.getValue();
assertEquals(2, groupedProcessInstanceEvents.size()); // both processInstanceEvents are grouped
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), captorPI.capture());
MultipleProcessInstanceDataEvent groupedProcessInstanceEvents = captorPI.getValue();
assertEquals(2, groupedProcessInstanceEvents.getData().size()); // both processInstanceEvents are grouped

verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), captor.capture());
Collection<DataEvent<?>> groupedUserTaskEvents = captor.getValue();
assertEquals(2, groupedUserTaskEvents.size()); // both userTaskEvents are grouped
ArgumentCaptor<MultipleUserTaskInstanceDataEvent> captorUT = ArgumentCaptor.forClass(MultipleUserTaskInstanceDataEvent.class);

verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), captorUT.capture());
MultipleUserTaskInstanceDataEvent groupedUserTaskEvents = captorUT.getValue();
assertEquals(2, groupedUserTaskEvents.getData().size()); // both userTaskEvents are grouped
}

@Test
Expand Down Expand Up @@ -310,7 +316,7 @@ public void testEventsDisabledInConfig() {

@Test
public void testNullEventInCollection() {
DataEvent<String> validEvent = mock(DataEvent.class);
DataEvent<String> validEvent = mock(ProcessInstanceDataEvent.class);
when(validEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");

Collection<DataEvent<?>> events = Arrays.asList(validEvent, null); // One valid event and one null event
Expand All @@ -322,7 +328,7 @@ public void testNullEventInCollection() {
groupingMessagingEventPublisher.publish(events);

// Verify the valid event is processed
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection());
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
}

@Test
Expand Down Expand Up @@ -361,7 +367,7 @@ public void testPublishToTopicWithDecorator() throws Exception {

@Test
public void testPublishWithMultipleEventTypesSomeWithoutConsumers() {
DataEvent<String> processInstanceEvent = mock(DataEvent.class);
DataEvent<String> processInstanceEvent = mock(ProcessInstanceDataEvent.class);
when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");

DataEvent<String> unsupportedEvent = mock(DataEvent.class);
Expand All @@ -375,7 +381,7 @@ public void testPublishWithMultipleEventTypesSomeWithoutConsumers() {
groupingMessagingEventPublisher.publish(events);

// Ensure that only the supported event was published
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), anyCollection());
verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), eq(Collections.singletonList(unsupportedEvent)));
}

Expand Down

0 comments on commit 6b58fcd

Please sign in to comment.