Skip to content

Commit

Permalink
[Fix #3721] Refactoring KogitoIndexConverter
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 24, 2024
1 parent 5d36770 commit 0be02c9
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ public static void writeInteger(DataOutput out, Integer integer) throws IOExcept
public static Integer readInteger(DataInput in) throws IOException {
SerType type = readType(in);
return type == SerType.NULL ? null : readInt(in, type);

}

private static void writeInt(DataOutput out, int size) throws IOException {
public static void writeInt(DataOutput out, int size) throws IOException {
if (size < Byte.MAX_VALUE) {
writeType(out, SerType.BYTE);
out.writeByte((byte) size);
Expand All @@ -253,7 +252,7 @@ private static void writeInt(DataOutput out, int size) throws IOException {
}
}

private static int readInt(DataInput in) throws IOException {
public static int readInt(DataInput in) throws IOException {
SerType type = readType(in);
return readInt(in, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.event;

import java.io.IOException;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Optional;
Expand All @@ -43,6 +44,22 @@ public static <T> DataEvent<T> from(CloudEvent event, Converter<CloudEventData,
return new CloudEventWrapDataEvent<>(event, dataUnmarshaller);
}

public static <T extends AbstractDataEvent<V>, V> T from(T dataEvent, CloudEvent cloudEvent, Converter<CloudEventData, V> dataUnmarshaller) throws IOException {
dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
dataEvent.setId(cloudEvent.getId());
dataEvent.setType(cloudEvent.getType());
dataEvent.setSource(cloudEvent.getSource());
dataEvent.setDataContentType(cloudEvent.getDataContentType());
dataEvent.setDataSchema(cloudEvent.getDataSchema());
dataEvent.setSubject(cloudEvent.getSubject());
dataEvent.setTime(cloudEvent.getTime());
cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName)));
if (cloudEvent.getData() != null) {
dataEvent.setData(dataUnmarshaller.convert(cloudEvent.getData()));
}
return dataEvent;
}

public static <T> DataEvent<T> from(T eventData, String trigger, KogitoProcessInstance pi) {
return from(eventData, trigger, URI.create("/process/" + pi.getProcessId()), Optional.empty(), ProcessMeta.fromKogitoProcessInstance(pi));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.impl;

import java.io.IOException;

import org.kie.kogito.event.Converter;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.cloudevents.CloudEventData;

public class JacksonTypeCloudEventDataConverter<O> implements Converter<CloudEventData, O> {

private ObjectMapper objectMapper;
private TypeReference<O> outputType;

public JacksonTypeCloudEventDataConverter(ObjectMapper objectMapper, TypeReference<O> outputType) {
this.objectMapper = objectMapper;
this.outputType = outputType;
}

@Override
public O convert(CloudEventData value) throws IOException {
return objectMapper.readValue(value.toBytes(), outputType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,4 @@ public boolean isCompressed() {
public void setCompressed(boolean compressed) {
addExtensionAttribute(COMPRESS_DATA, compressed);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*;

public class KogitoDataEventSerializationHelper {
class KogitoDataEventSerializationHelper {

private KogitoDataEventSerializationHelper() {
}
Expand All @@ -49,7 +49,10 @@ static <T extends AbstractDataEvent<?>> T readCloudEventAttrs(DataInput in, T da
data.setId(in.readUTF());
data.setSubject(readUTF(in));
data.setDataContentType(readUTF(in));
data.setDataSchema(URI.create(readUTF(in)));
String dataSchema = readUTF(in);
if (dataSchema != null) {
data.setDataSchema(URI.create(dataSchema));
}
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

public class MultipleProcessDataInstanceBeanDeserializerModifier extends BeanDeserializerModifier {

private static final long serialVersionUID = 1L;

@Override
public JsonDeserializer<?> modifyDeserializer(
DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer<?> deserializer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.serializer;

import java.io.IOException;
import java.util.Base64;
import java.util.Collection;

import org.kie.kogito.event.Converter;
import org.kie.kogito.event.impl.JacksonTypeCloudEventDataConverter;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;

public class MultipleProcessDataInstanceConverterFactory {

private MultipleProcessDataInstanceConverterFactory() {
}

public static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) {
if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType())) {
return isCompressed(cloudEvent) ? compressedConverter : binaryConverter;
} else {
return new JacksonTypeCloudEventDataConverter<>(objectMapper, new TypeReference<Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>>() {
});
}
}

private static boolean isCompressed(CloudEvent event) {
Object value = event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
return value instanceof Boolean ? ((Boolean) value).booleanValue() : false;
}

private static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> binaryConverter =
data -> deserialize(data, false);

private static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> compressedConverter =
data -> deserialize(data, true);

private static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> deserialize(CloudEventData data, boolean compress) throws IOException {
return MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()), compress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;

import org.kie.kogito.event.process.CloudEventVisitor;
import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
Expand All @@ -45,6 +45,8 @@
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -56,8 +58,12 @@

import io.cloudevents.SpecVersion;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt;

public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer<MultipleProcessInstanceDataEvent> implements ResolvableDeserializer {

private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);

private JsonDeserializer<Object> defaultDeserializer;

public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer<Object> deserializer) {
Expand Down Expand Up @@ -98,27 +104,34 @@ private static boolean isCompressed(JsonNode node) {
return compress != null && compress.isBoolean() ? compress.asBoolean() : false;
}

public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException {
static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException {
InputStream wrappedIn = new ByteArrayInputStream(binaryValue);
if (compressed) {
logger.trace("Gzip compressed byte array");
wrappedIn = new GZIPInputStream(wrappedIn);
}
try (DataInputStream in = new DataInputStream(wrappedIn)) {
int size = in.readShort();
int size = readInt(in);
logger.trace("Reading collection of size {}", size);
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> result = new ArrayList<>(size);
List<ProcessInstanceDataEventExtensionRecord> infos = new ArrayList<>();
while (size-- > 0) {
byte readInfo = in.readByte();
logger.trace("Info ordinal is {}", readInfo);
ProcessInstanceDataEventExtensionRecord info;
if (readInfo == -1) {
info = new ProcessInstanceDataEventExtensionRecord();
info.readEvent(in);
logger.trace("Info readed is {}", info);
infos.add(info);
} else {
info = infos.get(readInfo);
logger.trace("Info cached is {}", info);
}
String type = in.readUTF();
logger.trace("Type is {}", info);
result.add(getCloudEvent(in, type, info));
logger.trace("{} events remaining", size);
}
return result;
}
Expand All @@ -127,31 +140,44 @@ public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventS
private static ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException {
switch (type) {
case ProcessInstanceVariableDataEvent.VAR_TYPE:
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info);
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, info);
item.setKogitoVariableName(item.getData().getVariableName());
return item;
case ProcessInstanceStateDataEvent.STATE_TYPE:
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info);
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), ProcessInstanceStateEventBody::new, info);
case ProcessInstanceNodeDataEvent.NODE_TYPE:
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info);
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), ProcessInstanceNodeEventBody::new, info);
case ProcessInstanceErrorDataEvent.ERROR_TYPE:
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info);
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), ProcessInstanceErrorEventBody::new, info);
case ProcessInstanceSLADataEvent.SLA_TYPE:
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info);
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), ProcessInstanceSLAEventBody::new, info);
default:
throw new UnsupportedOperationException("Unrecognized event type " + type);
}
}

private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body,
private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, Supplier<V> bodySupplier,
ProcessInstanceDataEventExtensionRecord info) throws IOException {
int delta = KogitoEventBodySerializationHelper.readInteger(in);
int delta = readInt(in);
logger.trace("Time delta is {}", delta);
cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS));
KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent);
logger.trace("Cloud event before population {}", cloudEvent);
KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info);
body.readEvent(in);
body.visit(cloudEvent);
cloudEvent.setData(body);
logger.trace("Cloud event after population {}", cloudEvent);

boolean isNotNull = in.readBoolean();
if (isNotNull) {
logger.trace("Data is not null");
V body = bodySupplier.get();
body.readEvent(in);
logger.trace("Event body before population {}", body);
body.visit(cloudEvent);
logger.trace("Event body after population {}", body);
cloudEvent.setData(body);
} else {
logger.trace("Data is null");
}
return cloudEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt;

public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer<MultipleProcessInstanceDataEvent> {

private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);

private JsonSerializer<Object> defaultSerializer;

public MultipleProcessInstanceDataEventSerializer(JsonSerializer<Object> serializer) {
Expand Down Expand Up @@ -67,23 +72,41 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen,
private byte[] dataAsBytes(JsonGenerator gen, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data, boolean compress) throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) {
out.writeShort(data.size());
logger.trace("Writing size {}", data.size());
writeInt(out, data.size());
Map<String, ProcessInstanceDataEventExtensionRecord> infos = new HashMap<>();
for (ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> cloudEvent : data) {
String key = cloudEvent.getKogitoProcessInstanceId();
ProcessInstanceDataEventExtensionRecord info = infos.get(key);
if (info == null) {
out.writeByte(-1);
logger.trace("Writing marker byte -1");
out.writeByte((byte) -1);
info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent);
logger.trace("Writing info", info);
info.writeEvent(out);
infos.put(key, info);
} else {
logger.trace("Writing marker byte {}", info.getOrdinal());
out.writeByte((byte) info.getOrdinal());
}
logger.trace("Writing type {}", cloudEvent.getType());
out.writeUTF(cloudEvent.getType());
KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime()));
int timeDelta = cloudEvent.getTime().compareTo(info.getTime());
logger.trace("Writing time delta {}", timeDelta);
writeInt(out, timeDelta);
logger.trace("Writing cloud event attrs {}", cloudEvent);
KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent);
cloudEvent.getData().writeEvent(out);
KogitoMarshallEventSupport itemData = cloudEvent.getData();
if (itemData != null) {
logger.trace("Writing data not null boolean");
out.writeBoolean(true);
logger.trace("Writing cloud event body {}", itemData);
itemData.writeEvent(out);
} else {
logger.trace("Writing data null boolean");
out.writeBoolean(false);
}
logger.trace("individual event writing completed");
}
}
return bytesOut.toByteArray();
Expand Down
Loading

0 comments on commit 0be02c9

Please sign in to comment.