Skip to content

Commit

Permalink
[Fix #3721] Adding traces
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 24, 2024
1 parent 5d36770 commit a863046
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ public static void writeInteger(DataOutput out, Integer integer) throws IOExcept
public static Integer readInteger(DataInput in) throws IOException {
SerType type = readType(in);
return type == SerType.NULL ? null : readInt(in, type);

}

private static void writeInt(DataOutput out, int size) throws IOException {
public static void writeInt(DataOutput out, int size) throws IOException {
if (size < Byte.MAX_VALUE) {
writeType(out, SerType.BYTE);
out.writeByte((byte) size);
Expand All @@ -253,7 +252,7 @@ private static void writeInt(DataOutput out, int size) throws IOException {
}
}

private static int readInt(DataInput in) throws IOException {
public static int readInt(DataInput in) throws IOException {
SerType type = readType(in);
return readInt(in, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;

import org.kie.kogito.event.process.CloudEventVisitor;
import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
Expand All @@ -45,6 +45,8 @@
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -56,8 +58,12 @@

import io.cloudevents.SpecVersion;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt;

public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer<MultipleProcessInstanceDataEvent> implements ResolvableDeserializer {

private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);

private JsonDeserializer<Object> defaultDeserializer;

public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer<Object> deserializer) {
Expand Down Expand Up @@ -101,24 +107,31 @@ private static boolean isCompressed(JsonNode node) {
public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException {
InputStream wrappedIn = new ByteArrayInputStream(binaryValue);
if (compressed) {
logger.trace("Gzip compressed byte array");
wrappedIn = new GZIPInputStream(wrappedIn);
}
try (DataInputStream in = new DataInputStream(wrappedIn)) {
int size = in.readShort();
int size = readInt(in);
logger.trace("Reading collection of size {}", size);
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> result = new ArrayList<>(size);
List<ProcessInstanceDataEventExtensionRecord> infos = new ArrayList<>();
while (size-- > 0) {
byte readInfo = in.readByte();
logger.trace("Info ordinal is {}", readInfo);
ProcessInstanceDataEventExtensionRecord info;
if (readInfo == -1) {
info = new ProcessInstanceDataEventExtensionRecord();
info.readEvent(in);
logger.trace("Info readed is {}", info);
infos.add(info);
} else {
info = infos.get(readInfo);
logger.trace("Info cached is {}", info);
}
String type = in.readUTF();
logger.trace("Type is {}", info);
result.add(getCloudEvent(in, type, info));
logger.trace("{} events remaining", size);
}
return result;
}
Expand All @@ -127,31 +140,44 @@ public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventS
private static ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException {
switch (type) {
case ProcessInstanceVariableDataEvent.VAR_TYPE:
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info);
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, info);
item.setKogitoVariableName(item.getData().getVariableName());
return item;
case ProcessInstanceStateDataEvent.STATE_TYPE:
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info);
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), ProcessInstanceStateEventBody::new, info);
case ProcessInstanceNodeDataEvent.NODE_TYPE:
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info);
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), ProcessInstanceNodeEventBody::new, info);
case ProcessInstanceErrorDataEvent.ERROR_TYPE:
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info);
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), ProcessInstanceErrorEventBody::new, info);
case ProcessInstanceSLADataEvent.SLA_TYPE:
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info);
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), ProcessInstanceSLAEventBody::new, info);
default:
throw new UnsupportedOperationException("Unrecognized event type " + type);
}
}

private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body,
private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, Supplier<V> bodySupplier,
ProcessInstanceDataEventExtensionRecord info) throws IOException {
int delta = KogitoEventBodySerializationHelper.readInteger(in);
int delta = readInt(in);
logger.trace("Time delta is {}", delta);
cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS));
KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent);
logger.trace("Cloud event before population {}", cloudEvent);
KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info);
body.readEvent(in);
body.visit(cloudEvent);
cloudEvent.setData(body);
logger.trace("Cloud event after population {}", cloudEvent);

boolean isNotNull = in.readBoolean();
if (isNotNull) {
logger.trace("Data is not null");
V body = bodySupplier.get();
body.readEvent(in);
logger.trace("Event body before population {}", body);
body.visit(cloudEvent);
logger.trace("Event body after population {}", body);
cloudEvent.setData(body);
} else {
logger.trace("Data is null");
}
return cloudEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt;

public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer<MultipleProcessInstanceDataEvent> {

private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);

private JsonSerializer<Object> defaultSerializer;

public MultipleProcessInstanceDataEventSerializer(JsonSerializer<Object> serializer) {
Expand Down Expand Up @@ -67,23 +72,41 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen,
private byte[] dataAsBytes(JsonGenerator gen, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data, boolean compress) throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) {
out.writeShort(data.size());
logger.trace("Writing size {}", data.size());
writeInt(out, data.size());
Map<String, ProcessInstanceDataEventExtensionRecord> infos = new HashMap<>();
for (ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> cloudEvent : data) {
String key = cloudEvent.getKogitoProcessInstanceId();
ProcessInstanceDataEventExtensionRecord info = infos.get(key);
if (info == null) {
out.writeByte(-1);
logger.trace("Writing marker byte -1");
out.writeByte((byte) -1);
info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent);
logger.trace("Writing info", info);
info.writeEvent(out);
infos.put(key, info);
} else {
logger.trace("Writing marker byte {}", info.getOrdinal());
out.writeByte((byte) info.getOrdinal());
}
logger.trace("Writing type {}", cloudEvent.getType());
out.writeUTF(cloudEvent.getType());
KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime()));
int timeDelta = cloudEvent.getTime().compareTo(info.getTime());
logger.trace("Writing time delta {}", timeDelta);
writeInt(out, timeDelta);
logger.trace("Writing cloud event attrs {}", cloudEvent);
KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent);
cloudEvent.getData().writeEvent(out);
KogitoMarshallEventSupport itemData = cloudEvent.getData();
if (itemData != null) {
logger.trace("Writing data not null boolean");
out.writeBoolean(true);
logger.trace("Writing cloud event body {}", itemData);
itemData.writeEvent(out);
} else {
logger.trace("Writing data null boolean");
out.writeBoolean(false);
}
logger.trace("individual event writing completed");
}
}
return bytesOut.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,13 @@ public void readEvent(DataInput in) throws IOException {
source = URI.create(in.readUTF());
addons = readUTF(in);
}

@Override
public String toString() {
return "ProcessInstanceDataEventExtensionRecord [id=" + id + ", instanceId=" + instanceId + ", version="
+ version + ", state=" + state + ", type=" + type + ", parentInstanceId=" + parentInstanceId
+ ", rootId=" + rootId + ", rootInstanceId=" + rootInstanceId + ", businessKey=" + businessKey
+ ", identity=" + identity + ", source=" + source + ", time=" + time + ", addons=" + addons + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,27 @@
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
import org.kie.kogito.event.AbstractDataEvent;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.event.serializer.MultipleProcessInstanceDataEventDeserializer;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.jackson.utils.JsonObjectUtils;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.jackson.JsonFormat;

Expand All @@ -47,9 +51,9 @@

class ProcessEventsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(JsonFormat.getCloudEventJacksonModule())
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.findAndRegisterModules();

private static final Logger logger = LoggerFactory.getLogger(ProcessEventsTest.class);

Expand Down Expand Up @@ -128,7 +132,7 @@ void processInstanceDataEvent() throws Exception {

@Test
void multipleInstanceDataEvent() throws IOException {
JsonNode expectedVarValue = ObjectMapperFactory.get().createObjectNode().put("name", "John Doe");
JsonNode expectedVarValue = OBJECT_MAPPER.createObjectNode().put("name", "John Doe");
int standard = processMultipleInstanceDataEvent(expectedVarValue, false, false);
int binary = processMultipleInstanceDataEvent(expectedVarValue, true, false);
int binaryCompressed = processMultipleInstanceDataEvent(expectedVarValue, true, true);
Expand Down Expand Up @@ -184,10 +188,53 @@ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean
event.setCompressed(compress);
}

byte[] json = ObjectMapperFactory.get().writeValueAsBytes(event);
byte[] json = OBJECT_MAPPER.writeValueAsBytes(event);
logger.info("Serialized chunk size is {}", json.length);
MultipleProcessInstanceDataEvent deserializedEvent = ObjectMapperFactory.get().readValue(json, MultipleProcessInstanceDataEvent.class);

// cloud event structured mode check
MultipleProcessInstanceDataEvent deserializedEvent = OBJECT_MAPPER.readValue(json, MultipleProcessInstanceDataEvent.class);
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
assertMultipleIntance(deserializedEvent, expectedVarValue);

// cloud event binary mode check
CloudEvent cloudEvent = OBJECT_MAPPER.readValue(json, CloudEvent.class);
deserializedEvent = buildEvent(cloudEvent, OBJECT_MAPPER, MultipleProcessInstanceDataEvent::new);
if (cloudEvent.getData() != null) {
String contentType = cloudEvent.getDataContentType();
deserializedEvent.setData(
MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(contentType)
? MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(cloudEvent.getData().toBytes()), deserializedEvent.isCompressed())
: OBJECT_MAPPER.readValue(cloudEvent.getData().toBytes(), new TypeReference<Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>>() {
}));
}
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
assertMultipleIntance(deserializedEvent, expectedVarValue);
return json.length;
}

private static <E extends AbstractDataEvent<?>> E buildEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E> supplier) throws IOException {
E dataEvent = supplier.get();
applyCloudEventAttributes(cloudEvent, dataEvent);
applyExtensions(cloudEvent, dataEvent);
return dataEvent;
}

private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDataEvent<?> dataEvent) {
dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
dataEvent.setId(cloudEvent.getId());
dataEvent.setType(cloudEvent.getType());
dataEvent.setSource(cloudEvent.getSource());
dataEvent.setDataContentType(cloudEvent.getDataContentType());
dataEvent.setDataSchema(cloudEvent.getDataSchema());
dataEvent.setSubject(cloudEvent.getSubject());
dataEvent.setTime(cloudEvent.getTime());
}

private static void applyExtensions(CloudEvent cloudEvent, AbstractDataEvent<?> dataEvent) {
cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName)));
}

private void assertMultipleIntance(MultipleProcessInstanceDataEvent deserializedEvent, JsonNode expectedVarValue) {

Iterator<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> iter = deserializedEvent.getData().iterator();
ProcessInstanceStateDataEvent deserializedStateEvent = (ProcessInstanceStateDataEvent) iter.next();
Expand Down Expand Up @@ -215,8 +262,6 @@ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean
assertBaseEventValues(deserializedSLAEvent, ProcessInstanceSLADataEvent.SLA_TYPE);
assertExtensionNames(deserializedSLAEvent, BASE_EXTENSION_NAMES);
assertSLABody(deserializedSLAEvent.getData());

return json.length;
}

private void assertSLABody(ProcessInstanceSLAEventBody data) {
Expand Down

0 comments on commit a863046

Please sign in to comment.