Skip to content

Commit

Permalink
[FIX_LOGGING] add some debug lines and throwable catch to avoid data …
Browse files Browse the repository at this point in the history
…loss
  • Loading branch information
elguardian committed Jun 10, 2024
1 parent 492f41a commit b9a9134
Showing 1 changed file with 52 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@
import com.fasterxml.jackson.databind.SerializationFeature;

/**
* Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type.
* Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type.
*
* Expects following parameters to configure itself - via system properties
* <ul>
* <li>org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
* <li>org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092</li>
* <li>org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id</li>
* <li>org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a></li>
* <li>org.kie.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
* </ul>
* <li>org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
* <li>org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092</li>
* <li>org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id</li>
* <li>org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a></li>
* <li>org.kie.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
* </ul>
*/
public class KafkaEventEmitter implements EventEmitter {

Expand All @@ -62,7 +62,7 @@ public class KafkaEventEmitter implements EventEmitter {
protected static final String KAFKA_EMITTER_PREFIX = "org.kie.jbpm.event.emitters.kafka.";

private ObjectMapper mapper;

private KafkaSender sender;

private Producer<String, byte[]> producer;
Expand All @@ -73,10 +73,10 @@ public KafkaEventEmitter() {

KafkaEventEmitter(Producer<String, byte[]> producer) {
this.producer = producer;
this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX+"sync") ? this::sendSync : this::sendAsync;
mapper = new ObjectMapper()
this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX + "sync") ? this::sendSync : this::sendAsync;
this.mapper = new ObjectMapper()
.setDateFormat(new SimpleDateFormat(System.getProperty(
KAFKA_EMITTER_PREFIX+"date_format", System.getProperty(
KAFKA_EMITTER_PREFIX + "date_format", System.getProperty(
"org.kie.server.json.date_format",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))))
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
Expand All @@ -95,59 +95,69 @@ public void apply(Collection<InstanceView<?>> data) {
}

for (InstanceView<?> view : data) {
String processId;
long processInstanceId;
String type;
String topic;
if (view instanceof ProcessInstanceView) {
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
topic = "processes";
type = "process";
processInstanceId = processInstanceView.getId();
processId = processInstanceView.getProcessId();
} else if (view instanceof TaskInstanceView) {
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
topic = "tasks";
type = "task";
processInstanceId = taskInstanceView.getProcessInstanceId();
processId = taskInstanceView.getProcessId();
} else if (view instanceof CaseInstanceView) {
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
topic = "cases";
type = "case";
processInstanceId = caseInstanceView.getId();
processId = caseInstanceView.getCaseDefinitionId();
} else {
logger.warn("Unsupported view type {}", view.getClass());
continue;
try {
String processId;
long processInstanceId;
String type;
String topic;
if (view instanceof ProcessInstanceView) {
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
topic = "processes";
type = "process";
processInstanceId = processInstanceView.getId();
processId = processInstanceView.getProcessId();
} else if (view instanceof TaskInstanceView) {
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
topic = "tasks";
type = "task";
processInstanceId = taskInstanceView.getProcessInstanceId();
processId = taskInstanceView.getProcessId();
} else if (view instanceof CaseInstanceView) {
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
topic = "cases";
type = "case";
processInstanceId = caseInstanceView.getId();
processId = caseInstanceView.getCaseDefinitionId();
} else {
logger.warn("Unsupported view type {}", view.getClass());
continue;
}
sender.send(topic, type, processId, processInstanceId, view);
} catch (Throwable th) {
logError(view, th);
}
sender.send(topic, type, processId, processInstanceId, view);

}
}

private interface KafkaSender {
void send (String topic, String type, String processId, long processInstanceId, InstanceView<?> view);
void send(String topic, String type, String processId, long processInstanceId, InstanceView<?> view);
}

private byte[] viewToPayload(String type, String processId, long processInstanceId, InstanceView<?> view) throws JsonProcessingException {
return mapper.writeValueAsBytes(new CloudEventSpec1(type, String.format(SOURCE_FORMATTER, processId, processInstanceId), view));
}

private void sendAsync(String topic, String type, String processId, long processInstanceId, InstanceView<?> view) {
logger.debug("Sending async view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
try {
producer.send(new ProducerRecord<>(getTopic(topic), viewToPayload(type, processId, processInstanceId, view)), (m, e) -> {
if (e != null) {
logError(view, e);
} else {
logger.debug("Sucessfuly async view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
}
});
} catch (Exception e) {
} catch (Throwable e) {
logError(view, e);
}
}

private void sendSync(String topic, String type, String processId, long processInstanceId, InstanceView<?> view) {
try {
logger.debug("Sending sync view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
producer.send(new ProducerRecord<>(getTopic(topic), viewToPayload(type, processId, processInstanceId, view))).get();
logger.debug("Sucessfuly sync view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
} catch (InterruptedException e) {
Expand All @@ -162,11 +172,10 @@ private void sendSync(String topic, String type, String processId, long processI
}
}

private void logError(InstanceView<?> view, Exception e) {
private void logError(InstanceView<?> view, Throwable e) {
logger.error("Error publishing view {}", view, e);
}


@Override
public void drop(Collection<InstanceView<?>> data) {
// nothing to do
Expand All @@ -191,7 +200,7 @@ private static Producer<String, byte[]> getProducer() {

private static String getTopic(String eventType) {
return System.getProperty("org.kie.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType +
"-events");
"-events");
}

protected static Map<String, Object> getProducerProperties() {
Expand Down

0 comments on commit b9a9134

Please sign in to comment.