diff --git a/jbpm-event-emitters/jbpm-event-emitters-kafka/src/main/java/org/jbpm/event/emitters/kafka/KafkaEventEmitter.java b/jbpm-event-emitters/jbpm-event-emitters-kafka/src/main/java/org/jbpm/event/emitters/kafka/KafkaEventEmitter.java index 5631bf66e9..6082f2a701 100644 --- a/jbpm-event-emitters/jbpm-event-emitters-kafka/src/main/java/org/jbpm/event/emitters/kafka/KafkaEventEmitter.java +++ b/jbpm-event-emitters/jbpm-event-emitters-kafka/src/main/java/org/jbpm/event/emitters/kafka/KafkaEventEmitter.java @@ -44,16 +44,16 @@ import com.fasterxml.jackson.databind.SerializationFeature; /** - * Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type. + * Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type. * * Expects following parameters to configure itself - via system properties * + *
  • org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ
  • + *
  • org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092
  • + *
  • org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id
  • + *
  • org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check Kafka documentation
  • + *
  • org.kie.jbpm.event.emitters.kafka.topic.. Topic name for subscribing to these events. Defaults are "jbpm--events"
  • + * */ public class KafkaEventEmitter implements EventEmitter { @@ -62,7 +62,7 @@ public class KafkaEventEmitter implements EventEmitter { protected static final String KAFKA_EMITTER_PREFIX = "org.kie.jbpm.event.emitters.kafka."; private ObjectMapper mapper; - + private KafkaSender sender; private Producer producer; @@ -73,10 +73,10 @@ public KafkaEventEmitter() { KafkaEventEmitter(Producer producer) { this.producer = producer; - this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX+"sync") ? this::sendSync : this::sendAsync; - mapper = new ObjectMapper() + this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX + "sync") ? this::sendSync : this::sendAsync; + this.mapper = new ObjectMapper() .setDateFormat(new SimpleDateFormat(System.getProperty( - KAFKA_EMITTER_PREFIX+"date_format", System.getProperty( + KAFKA_EMITTER_PREFIX + "date_format", System.getProperty( "org.kie.server.json.date_format", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")))) .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) @@ -95,38 +95,43 @@ public void apply(Collection> data) { } for (InstanceView view : data) { - String processId; - long processInstanceId; - String type; - String topic; - if (view instanceof ProcessInstanceView) { - ProcessInstanceView processInstanceView = (ProcessInstanceView) view; - topic = "processes"; - type = "process"; - processInstanceId = processInstanceView.getId(); - processId = processInstanceView.getProcessId(); - } else if (view instanceof TaskInstanceView) { - TaskInstanceView taskInstanceView = (TaskInstanceView) view; - topic = "tasks"; - type = "task"; - processInstanceId = taskInstanceView.getProcessInstanceId(); - processId = taskInstanceView.getProcessId(); - } else if (view instanceof CaseInstanceView) { - CaseInstanceView caseInstanceView = (CaseInstanceView) view; - topic = "cases"; - type = "case"; - processInstanceId = caseInstanceView.getId(); - processId = caseInstanceView.getCaseDefinitionId(); - } else { - logger.warn("Unsupported view type {}", view.getClass()); - continue; + try { + String processId; + long processInstanceId; + String type; + String topic; + if (view instanceof ProcessInstanceView) { + ProcessInstanceView processInstanceView = (ProcessInstanceView) view; + topic = "processes"; + type = "process"; + processInstanceId = processInstanceView.getId(); + processId = processInstanceView.getProcessId(); + } else if (view instanceof TaskInstanceView) { + TaskInstanceView taskInstanceView = (TaskInstanceView) view; + topic = "tasks"; + type = "task"; + processInstanceId = taskInstanceView.getProcessInstanceId(); + processId = taskInstanceView.getProcessId(); + } else if (view instanceof CaseInstanceView) { + CaseInstanceView caseInstanceView = (CaseInstanceView) view; + topic = "cases"; + type = "case"; + processInstanceId = caseInstanceView.getId(); + processId = caseInstanceView.getCaseDefinitionId(); + } else { + logger.warn("Unsupported view type {}", view.getClass()); + continue; + } + sender.send(topic, type, processId, processInstanceId, view); + } catch (Throwable th) { + logError(view, th); } - sender.send(topic, type, processId, processInstanceId, view); + } } - + private interface KafkaSender { - void send (String topic, String type, String processId, long processInstanceId, InstanceView view); + void send(String topic, String type, String processId, long processInstanceId, InstanceView view); } private byte[] viewToPayload(String type, String processId, long processInstanceId, InstanceView view) throws JsonProcessingException { @@ -134,20 +139,25 @@ private byte[] viewToPayload(String type, String processId, long processInstance } private void sendAsync(String topic, String type, String processId, long processInstanceId, InstanceView view) { + logger.debug("Sending async view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view); try { producer.send(new ProducerRecord<>(getTopic(topic), viewToPayload(type, processId, processInstanceId, view)), (m, e) -> { if (e != null) { logError(view, e); + } else { + logger.debug("Sucessfuly async view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view); } }); - } catch (Exception e) { + } catch (Throwable e) { logError(view, e); } } private void sendSync(String topic, String type, String processId, long processInstanceId, InstanceView view) { try { + logger.debug("Sending sync view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view); producer.send(new ProducerRecord<>(getTopic(topic), viewToPayload(type, processId, processInstanceId, view))).get(); + logger.debug("Sucessfuly sync view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } catch (InterruptedException e) { @@ -162,11 +172,10 @@ private void sendSync(String topic, String type, String processId, long processI } } - private void logError(InstanceView view, Exception e) { + private void logError(InstanceView view, Throwable e) { logger.error("Error publishing view {}", view, e); } - @Override public void drop(Collection> data) { // nothing to do @@ -191,7 +200,7 @@ private static Producer getProducer() { private static String getTopic(String eventType) { return System.getProperty("org.kie.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType + - "-events"); + "-events"); } protected static Map getProducerProperties() {