From fef368851030964e5bd56c7182fc952e9cf34183 Mon Sep 17 00:00:00 2001 From: ramindu90 Date: Wed, 29 Mar 2017 23:02:32 +0530 Subject: [PATCH] Removing continuous commit logs --- .../input/transport/kafka/KafkaConsumerThread.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/siddhi-extensions/input-transports/kafka-input-transport/src/main/java/org/wso2/siddhi/extension/input/transport/kafka/KafkaConsumerThread.java b/modules/siddhi-extensions/input-transports/kafka-input-transport/src/main/java/org/wso2/siddhi/extension/input/transport/kafka/KafkaConsumerThread.java index 4a513c9d77..48fdb703c3 100644 --- a/modules/siddhi-extensions/input-transports/kafka-input-transport/src/main/java/org/wso2/siddhi/extension/input/transport/kafka/KafkaConsumerThread.java +++ b/modules/siddhi-extensions/input-transports/kafka-input-transport/src/main/java/org/wso2/siddhi/extension/input/transport/kafka/KafkaConsumerThread.java @@ -107,9 +107,11 @@ public void run() { sourceEventListener.onEvent(event); } try { - consumer.commitAsync(); + if (!records.isEmpty()) { + consumer.commitAsync(); + } } catch (CommitFailedException e) { - log.error("Kafka commit failed for topic/s" + Arrays.toString(topics), e); + log.error("Kafka commit failed for topic kafka_result_topic", e); } } try {