Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error on closing Parallel Consumer #459

Open
jatindersthind opened this issue Oct 27, 2022 · 8 comments
Open

Error on closing Parallel Consumer #459

jatindersthind opened this issue Oct 27, 2022 · 8 comments

Comments

@jatindersthind
Copy link

jatindersthind commented Oct 27, 2022

When trying to close parallel Consumer using parallelConsumer.close(),
Getting below error:

[INFO ] 2022-10-20 23:03:00.661 [Thread-3] communication.kafka.KafkaConnector - Closing ParallelConsumer for url: localhost:9092, topic: MY-TOPIC-1
[ERROR] 2022-10-20 23:03:30.662 [pc-control] io.confluent.parallelconsumer.internal.BrokerPollSystem - **Execution or timeout exception waiting for broker poller thread to finish**
java.util.concurrent.TimeoutException: null
	at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:515) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:698) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:639) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]


[ERROR] 2022-10-20 23:03:30.676 [pc-control] io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor - **Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: null**
java.util.concurrent.TimeoutException: null
	at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:515) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:698) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:639) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]


[ERROR] 2022-10-20 23:03:30.682 [Thread-3] io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor - **Execution or timeout exception while waiting for the control thread to close cleanly (state was closing). Try increasing your time-out to allow the system to drain, or close without draining.**
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Can't commit - not running (state: closed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_252]
	at java.util.concurrent.FutureTask.get(FutureTask.java:206) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:464) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:446) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:60) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:424) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaConnector.stop(KafkaConnector.java:935) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaClient.unsubscribe(KafkaClient.java:74) ~[service-1.0.0.jar:?]
Caused by: java.lang.IllegalStateException: Can't commit - not running (state: closed
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:315) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1089) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:511) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:642) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	... 1 more


[ERROR] 2022-10-20 23:03:30.684 [Thread-3] communication.kafka.KafkaConnector - **Error closing parallelConsumer
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Can't commit - not running (state: closed**
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_252]
	at java.util.concurrent.FutureTask.get(FutureTask.java:206) ~[?:1.8.0_252]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:464) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:446) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:60) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:424) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaConnector.stop(KafkaConnector.java:935) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaClient.unsubscribe(KafkaClient.java:74) ~[service-1.0.0.jar:?]
	at BaseremoveKafkaListener(Basejava:133) ~[service-1.0.0.jar:?]
	at group.handler.MicroServiceHandler.removeAllGroups(MicroServiceHandler.java:92) ~[service-1.0.0.jar:?]
	at group.handler.MicroServiceHandler.doHandle(MicroServiceHandler.java:66) ~[service-1.0.0.jar:?]
	at group.handler.MicroServiceHandler.doHandle(MicroServiceHandler.java:19) ~[service-1.0.0.jar:?]
	at communication.message.handler.MessageHandler.handle(MessageHandler.java:28) ~[service-1.0.0.jar:?]
	at communication.message.processor.MessageProcessor.process(MessageProcessor.java:17) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaClient$KafkaProcessingInterfaceImpl.process(KafkaClient.java:89) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaMessageProcessingTask.run(KafkaMessageProcessingTask.java:26) ~[service-1.0.0.jar:?]
	at communication.kafka.KafkaConnector$3.run(KafkaConnector.java:463) ~[service-1.0.0.jar:?]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
Caused by: java.lang.IllegalStateException: Can't commit - not running (state: closed
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:315) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1089) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:511) ~[service-1.0.0.jar:?]
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:642) ~[service-1.0.0.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	... 1 more
@astubbs
Copy link
Contributor

astubbs commented Nov 1, 2022

What version?

@jatindersthind
Copy link
Author

jatindersthind commented Nov 2, 2022

What version?

0.5.2.1

@astubbs Also we see multiple different types of errors due to which parallelconsumer shutdown happens, but our java process still keeps running. Is there any way through which we can shutdown out process, maybe we can get event from some kind of shutdown listener on parallelConsumer?

@astubbs
Copy link
Contributor

astubbs commented Nov 2, 2022

Yes, you can periodically call the isClosedOrFailed method.

A shutdown listener is an interesting suggestion - feel free to create a feature request. There isn't anything like that atm.

Please update to the latest version, then try again?

@jatindersthind
Copy link
Author

jatindersthind commented Nov 3, 2022

Please update to the latest version, then try again?

Sure will try to update to 0.5.2.3
Also will try working on isClosedOrFailed periodic checks

@astubbs
Copy link
Contributor

astubbs commented Nov 3, 2022

We're also about to release a metrics package which might be relevant - so stay tuned on:

@jatindersthind
Copy link
Author

We're also about to release a metrics package which might be relevant - so stay tuned on:

sure

Yes, you can periodically call the isClosedOrFailed method.

Just need to confirm one thing is
this method only available in ParallelEoSStreamProcessor(AbstractParallelEoSStreamProcessor) and not at interface level ParallelConsumer or ParallelStreamProcessor?
If yes then we will have to cast it to ParallelEoSStreamProcessor

@astubbs
Copy link
Contributor

astubbs commented Nov 4, 2022 via email

@astubbs
Copy link
Contributor

astubbs commented Nov 9, 2022

An you include earlier logs? The errors you showed were the errors from trying to shutdown when the client was already closed. I can add some checks to avoid those.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants