From f4fa1a88e12499214c8436b10ddcbab8e32c0a27 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Tue, 30 Aug 2022 18:20:55 +0100 Subject: [PATCH] #6858 downgrade kafka in-loop debug info to DEBUG from INFO - too noisy Signed-off-by: Nigel Jones --- .../kafka/KafkaOpenMetadataEventConsumer.java | 30 +++++++++---------- .../kafka/KafkaOpenMetadataEventProducer.java | 20 ++++++------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.java b/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.java index 38ee10e4ddb..96ac26db401 100644 --- a/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.java +++ b/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.java @@ -166,7 +166,7 @@ public void run() //The connector queue is too big. Wait until the size goes down until //polling again. If we let the events just accumulate, we will //eventually run out of memory if the consumer cannot keep up. - log.info("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize); + log.debug("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize); awaitNextPollingTime(); continue; @@ -183,7 +183,7 @@ public void run() String json = consumerRecord.value(); log.debug("Received message: {}" ,json); countReceivedMessages++; - log.info("Metrics: receivedMessages: {}", countReceivedMessages); + log.debug("Metrics: receivedMessages: {}", countReceivedMessages); final KafkaIncomingEvent event = new KafkaIncomingEvent(json, consumerRecord.offset()); final String recordKey=consumerRecord.key(); final String recordValue=consumerRecord.value(); @@ -194,12 +194,12 @@ public void run() addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), event); connector.distributeToListeners(event); countMessagesToProcess++; - log.info("Metrics: messagesToProcess: {}", countMessagesToProcess); + log.debug("Metrics: messagesToProcess: {}", countMessagesToProcess); } catch (Exception error) { countMessagesFailedToProcess++; - log.info("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess); + log.debug("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess); log.warn("Error distributing inbound event: {}", error.getMessage()); if (auditLog != null) @@ -217,7 +217,7 @@ public void run() { log.debug("Ignoring message with key: {} and value: {}",recordKey, recordValue); countIgnoredMessages++; - log.info("Metrics: ignoredMessages: {}", countIgnoredMessages); + log.debug("Metrics: ignoredMessages: {}", countIgnoredMessages); } if ( isAutoCommitEnabled) { @@ -231,14 +231,14 @@ public void run() final TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); currentOffsets.put(partition, new OffsetAndMetadata(consumerRecord.offset() + 1)); countCommits++; - log.info("Metrics: messageCommits: {}", countCommits); + log.debug("Metrics: messageCommits: {}", countCommits); } } } catch (WakeupException e) { - log.info("Received wakeup call, proceeding with graceful shutdown"); + log.debug("Received wakeup call, proceeding with graceful shutdown"); } catch (Exception error) { @@ -313,7 +313,7 @@ public void run() } consumer = null; } - log.debug("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe); + log.info("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe); } @@ -358,7 +358,7 @@ private boolean checkForFullyProcessedMessages() { if (isAutoCommitEnabled) { return false; } - log.info("Checking for fully processed messages whose offsets need to be committed"); + log.debug("Checking for fully processed messages whose offsets need to be committed"); //Check all the queues to see they have events initial events //that are fully processed @@ -375,7 +375,7 @@ private boolean checkForFullyProcessedMessages() { if (! commitData.isEmpty()) { currentOffsets.putAll(commitData); - log.info("Committing: {}", commitData); + log.debug("Committing: {}", commitData); try { consumer.commitSync(commitData); return true; @@ -417,15 +417,15 @@ private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue< //The message at the beginning of the queue has been fully processed. Remove //it from the queue and repeat the check. lastRemoved = queue.remove(); - log.info("Message with offset {} has been fully processed.",lastRemoved.getOffset() ); + log.debug("Message with offset {} has been fully processed.",lastRemoved.getOffset() ); countCommits++; - log.info("Metrics: commits: {}", countCommits); + log.debug("Metrics: commits: {}", countCommits); } KafkaIncomingEvent firstEvent = queue.peek(); if (firstEvent != null) { //Queue is not empty, so we're waiting for the processing of first message in //the queue to finish - log.info("Waiting for completing of processing of message with offset {}",firstEvent.getOffset()); + log.debug("Waiting for completing of processing of message with offset {}",firstEvent.getOffset()); } return lastRemoved; } @@ -551,12 +551,12 @@ public void onPartitionsAssigned(Collection partitions) { // Check if we need to rewind to handle initial startup case -- but only on first assignment try { if (initialPartitionAssignment) { - log.info("Received initial PartitionsAssigned event"); + log.debug("Received initial PartitionsAssigned event"); long partitionCount = partitions.size(); if (partitionCount != 1) { - log.info("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount); + log.warn("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount); } else { // there is only one partition, so we can just grab the first one - and we'll try this once only initialPartitionAssignment = false; diff --git a/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.java b/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.java index 28f7f7fa3f3..2cb9710351a 100644 --- a/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.java +++ b/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.java @@ -84,7 +84,7 @@ private void publishEvent(String event) throws ConnectorCheckedException { long eventRetryCount = 0; messagePublishRequestCount++; - log.info("Metrics: messagePublishRequestCount {}", messagePublishRequestCount); + log.debug("Metrics: messagePublishRequestCount {}", messagePublishRequestCount); if (producer == null) { try { @@ -107,11 +107,11 @@ private void publishEvent(String event) throws ConnectorCheckedException { log.debug("Sending message try {} [0 based] : {}", eventRetryCount,event); ProducerRecord producerRecord = new ProducerRecord<>(topicName, localServerId, event); kafkaSendAttemptCount++; - log.info("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount); + log.debug("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount); producer.send(producerRecord).get(); eventSent = true; messageSendCount++; - log.info("Metrics: messageSendCount {}", messageSendCount); + log.debug("Metrics: messageSendCount {}", messageSendCount); } catch (ExecutionException error) { kafkaSendFailCount++; log.debug("Metrics: kafkaSendFailCount {}", kafkaSendFailCount); @@ -129,7 +129,7 @@ private void publishEvent(String event) throws ConnectorCheckedException { producer = null; messageFailedSendCount++; - log.info(messageFailedCountString, messageFailedSendCount); + log.warn(messageFailedCountString, messageFailedSendCount); throw new ConnectorCheckedException( KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition( @@ -141,7 +141,7 @@ private void publishEvent(String event) throws ConnectorCheckedException { producer.close(); producer = null; messageFailedSendCount++; - log.info(messageFailedCountString, messageFailedSendCount); + log.warn(messageFailedCountString, messageFailedSendCount); log.error("Retryable Exception closed producer after {} tries", eventRetryCount); break; } else { @@ -171,7 +171,7 @@ private void publishEvent(String event) throws ConnectorCheckedException { } messageFailedSendCount++; - log.info(messageFailedCountString, messageFailedSendCount); + log.warn(messageFailedCountString, messageFailedSendCount); throw new ConnectorCheckedException( KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition( @@ -225,7 +225,7 @@ public void run() { } } } catch (InterruptedException error) { - log.info("Woken up from sleep "); + log.debug("Woken up from sleep "); Thread.currentThread().interrupt(); } catch (Exception error) { log.warn("Bad exception from sending events: {}",error.getMessage()); @@ -240,7 +240,7 @@ public void run() { } } } - log.debug("Exiting main loop for topic {} & cleaning up", topicName); + log.info("Exiting main loop for topic {} & cleaning up", topicName); /* producer may have already closed by exception handler in publishEvent */ if (producer != null) { @@ -265,8 +265,8 @@ public void run() { */ private void putEvent(String newEvent) { inmemoryPutMessageCount++; - log.info("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount); - log.info("Metrics: sendBufferSize {}", sendBuffer.size()); + log.debug("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount); + log.debug("Metrics: sendBufferSize {}", sendBuffer.size()); sendBuffer.add(newEvent); }