Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sporadic timeouts from ConsumerOffsetCommitter.CommitRequest #809

Open
tedcaozoom opened this issue Jun 25, 2024 · 22 comments · Fixed by wushilin/parallel-consumer#1
Open

Comments

@tedcaozoom
Copy link

Our applications in GKE began to have sporadic timeouts with following exceptions started 6/7/2024, and we ruled out code problems and infrastructural problems and we also tested the newest version of parallel consumer without success, we do not know what's going on

java.util.concurrent.ExecutionException: io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=5d9ed827-0261-4a06-907c-4010bfe919c8, requestedAtMs=1718620540268)
at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:559)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:539)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:518)
at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:61)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerContainer.close(ParallelConsumerContainer.java:47)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerGroup.lambda$stop$1(ParallelConsumerGroup.java:99)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerGroup.stop(ParallelConsumerGroup.java:97)
at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:248)
at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:54)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:374)
at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:207)
at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:130)
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1070)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:174)
at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1024)
at org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:145)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=5d9ed827-0261-4a06-907c-4010bfe919c8, requestedAtMs=1718620540268)
at io.confluent.parallelconsumer.internal.InternalRuntimeException.msg(InternalRuntimeException.java:23)
at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commitAndWait(ConsumerOffsetCommitter.java:154)
at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commit(ConsumerOffsetCommitter.java:74)
at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:346)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1231)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:622)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$10(AbstractParallelEoSStreamProcessor.java:751)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 common frames omitted

@tedcaozoom
Copy link
Author

also the timeouts are not limited to OffsetCommitter, here are ones from sending messages

at io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun(UserFunctions.java:61)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$poll$0(ParallelEoSStreamProcessor.java:54)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$runUserFunctionInternal$17(AbstractParallelEoSStreamProcessor.java:1323)
at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:65)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunctionInternal(AbstractParallelEoSStreamProcessor.java:1323)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunction(AbstractParallelEoSStreamProcessor.java:1274)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$submitWorkToPoolInner$14(AbstractParallelEoSStreamProcessor.java:928)

java.lang.InterruptedException: null
at java.base/java.util.concurrent.FutureTask.awaitDone(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:120)
at com.zoominfo.kafka.producer.ProtobufKafkaMessageSender.send(ProtobufKafkaMessageSender.java:43)
at com.zoominfo.personincremental.trace.KafkaTraceRegistry.register(KafkaTraceRegistry.java:50)
at com.zoominfo.traces.CompositeTraceRegistry.register(CompositeTraceRegistry.java:18)
at com.zoominfo.component.messaging.person.PersonBaseMessageHandler.handleMessage(PersonBaseMessageHandler.java:73)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerContainer.lambda$start$0(ParallelConsumerContainer.java:29)
at io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun(UserFunctions.java:61)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$poll$0(ParallelEoSStreamProcessor.java:54)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$runUserFunctionInternal$17(AbstractParallelEoSStreamProcessor.java:1323)
at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:65)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunctionInternal(AbstractParallelEoSStreamProcessor.java:1323)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunction(AbstractParallelEoSStreamProcessor.java:1274)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$submitWorkToPoolInner$14(AbstractParallelEoSStreamProcessor.java:928)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)

@rkolesnev
Copy link
Member

Hi @tedcaozoom,

Can you please provide more information on the architecture, configuration and environment?

Are there specific steps that enable to reproduce the issue in test / synthetic environment?
Alternatively - can you provide sample code that reproduces the issue observed?

Configuration

  • Parallel Consumer Options (Ordering mode, Transactions, Commit mode, Processing mode, Batching etc) - all options specified during instantiation / configuration of Parallel Consumer instance.

Consumer configuration

  • Producer configuration (if producer is used - ParallelConsumer processing mode is Consume and Produce or Consume and Produce Many)

Setup

  • Number of partitions

  • Number of Parallel Consumer instances and partitions per instance

  • Data flow details - consistent data flow or spiky? Any correlation between data flow changes and issue observed?

  • At high level - what is User Function doing - is it slow / fast? approximate latency of User Function?

Details about the issue observed

  • How consistently it is observed?

  • Any specific steps that lead to the issue in question?

  • Does Parallel Consumer recover on itself after the issue - or does it need to be restarted / crashes?

  • Is the issue observed on specific or different / random partitions?

  • Is the issue observed on specific or different Parallel Consumer instances?

Please provide logs spanning period when the issue in question is observed

Please provide any metrics collected for both Kafka cluster / topic and Parallel Consumer application (including KafkaConsumer and KafkaProducer) metrics for the period when the issue in question is observed

From the stack traces above:

  • the first stack trace looks like timeout performing Commit during shutdown - why was it shutting down? Was there an issue with Kafka connectivity / broker state (maybe cluster rolls etc)?

  • second stack trace - looks like an interrupted user function that was doing ProtobufKafkaMessageSender.send() - it looks like part of your application code - but i would assume it is some kind of Producer wrapper etc - it timing out would potentially suggest more general Kafka connectivity issues than Parallel Consumer specific issue.
    Was that interrupt during draining messages on shutdown or during the course of normal execution?

@tedcaozoom
Copy link
Author

@rkolesnev I will try to get more info as soon as I can but I have a more urgent need, I really need our pods in GKE to be able to restart themselves when they encounter this problem (which basically kill our pods and we have to manually restart them, as soon as restarted they work fine for a while), so I would like to know how better to build the health checks

currently we do the following and it's NOT working, so apparently ParallelConsumerGroup's containers are NOT closed or failed, how else can we do this?? ParallelConsumerGroup.isRunning() ??? or something else to indicate a problem? we need GKE to restart the pods themselves to buy us time to diagnose this further

    if (ParallelConsumerGroup.getContainers().stream().anyMatch(c -> c.getProcessor().isClosedOrFailed())) {
        return Health.down().build();
    }

@tedcaozoom
Copy link
Author

just to report, it appears that ParallelConsumerGroup.isRunning() works correctly

so basically !ParallelConsumerGroup.isRunning() == needing to restart

so one more piece of information for this is that, when we encounter these timeouts, ParallelConsumerGroup stops running, ParallelConsumerGroup.isRunning() becomes FALSE

@rkolesnev
Copy link
Member

rkolesnev commented Jul 1, 2024

Hmm that is interesting - what is the ParallelConsumerGroup object?
ParallelConsumer interface doesnt have isRunning method, so just curious what are you hooking into here?

note that internal ParallelStreamProcessor state object - can be RUNNING, PAUSED, CLOSING, CLOSED etc - and there RUNNING can be switched to PAUSED when back-pressure is kicking in - if its polling more data than its can process...

i guess it is some custom code on your side - containers / parallel consumer group - wrapping multiple parallel consumer instances...

@rkolesnev
Copy link
Member

Just to re-iterate - i cannot really help with the issue if i do not know what those things are, what is your setup, usage scenarios, configs, logs, versions etc etc..

@gtassone
Copy link
Contributor

Hello i wish to provide some detail on this scenario -

first, ParallelConsumerGroup is a Spring Lifecycle container for PC processors. It is intended to support clean shutdown and of PC processors when one of them dies. The problem is they are not dying successfully in 0.5.3.0. last working version for clean shutdown was 0.5.2.5

second - producer timeout is not via PC producer, it is just happening within PC processing thread. so suggests general kafka connectivity instability.

So we are getting periodic connectivity/timeouts on commit to confluent/kafka broker. in that case the PC processor cannot recover and (ideally) dies. So remediation is health-check to restart the k8s pod, (new PC processors start up, attempt to rejoin kafka consumer group, etc).

setup -
PC Options -

        final ParallelConsumerOptions options =
            ParallelConsumerOptions.<String, String>builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED)
                .consumer(consumer).maxConcurrency(consumerProps.concurrency())
                .commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC)
                .commitInterval(java.time.Duration.ofMillis(1000)).build();

        ParallelStreamProcessor<String, byte[]> processor = ParallelStreamProcessor.createEosStreamProcessor(options);
        processor.subscribe(Arrays.asList(containerProps.getTopics()));

concurrency 64, single topic consumer, etc.

partitions - 128
PC consumers/pods - range from 1-25 based on load
can be multiple topic/consumers per pod
spiky data flow can be long periods of high volume or low/moderate
user function is slow/expensive, latency can be 100ms-minutes per message

issue occurs clustered (sometimes one PC consumer, sometimes multiple pods or topics) appears related to general kafka connectivity/stability. kafka cluster is a large shared cluster

Logs -

Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=0d0121b0-2bf4-4a27-8a04-ead24d10219b, requestedAtMs=1721405653553)

logger_name
io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
stack_trace
io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=0d0121b0-2bf4-4a27-8a04-ead24d10219b, requestedAtMs=1721405653553)
    at io.confluent.parallelconsumer.internal.InternalRuntimeException.msg(InternalRuntimeException.java:23)
    at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commitAndWait(ConsumerOffsetCommitter.java:154)
    at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commit(ConsumerOffsetCommitter.java:74)
    at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:346)
    at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1262)
    at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:844)
    at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$9(AbstractParallelEoSStreamProcessor.java:799)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

and in some cases

Unknown error


logger_name
io.confluent.parallelconsumer.internal.BrokerPollSystem
stack_trace
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {data-apps.pti-processing.incremental-102=OffsetAndMetadata{offset=28907371, leaderEpoch=null, metadata='bAJK/v//////////////////////////////////////////////////////////////////////7/f//b//v////v353v99Wy33OwMA'}}

other cases is the Timeout waiting for commit response during waitForClose()

@gtassone
Copy link
Contributor

It seems acceptable for PC processor to close during kafka connectivity failure. The bigger problem in this case is failure to close successfully.

we hit this error state due to connectivity failure -

log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), e);

now it attempts doClose(timeout)

private void doClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException {

it attempts to drain brokerPollSubsystem.drain();
but does not transition AbstractParallelEoSStreamProcessor.state first.
there is no possibility to invoke close without draining in this case, which will be preferable if we have connectivity problem and cannot commit later.

i can see from JDWP on hung process -
AbstractParallelEoSStreamProcessor.state=RUNNING
AbstractParallelEoSStreamProcessor.isClosedOrFailed()=false
BrokerPollSystem.state=DRAINING
BrokerPollSystem.pausedForThrottling=true
BrokerPollSystem.controlThreadFuture=Completed Exceptionally with the org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets

This state appears permanently hung and cannot be discovered with healthcheck on AbstractParallelEoSStreamProcessor.isClosedOrFailed()

@gtassone
Copy link
Contributor

i can also note that this invocation of doClose(timeout) does not apply the timeout when calling brokerPollSubsystem.drain(), and does not use waitForClose() or allow for any control of drainMode. I could add that awaitingInflightProcessingCompletionOnShutdown=false so it doesn't get that far to set to true.
appears hung on brokerPollSubsystem.drain(); hitting commit timeout with no monitor or enclosing timeout.

@gtassone
Copy link
Contributor

running with additional logging - when stuck it looks like this happens during shutdown attempt -

logger_name | io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
thread_name | pc-control
Inflight work in execution pool: 32, letting to finish on shutdown with timeout: PT10S
Thread execution pool termination await timeout (PT10S)! Were any processing jobs dead locked (test latch locks?) or otherwise stuck? Forcing shutdown of workers.
Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: 32


 
--
logger_name | io.confluent.parallelconsumer.internal.BrokerPollSystem
thread_name | pc-broker-poll
Unknown error

stacktrace | org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303)
...
	at io.confluent.parallelconsumer.internal.AbstractOffsetCommitter.retrieveOffsetsAndCommit(AbstractOffsetCommitter.java:40)
	at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.maybeDoCommit(ConsumerOffsetCommitter.java:175)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.maybeDoCommit(BrokerPollSystem.java:357)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:138)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

sometimes that last unknown error in brokerPollSystem is a commit timeout instead

so probably there are some timeout adjustments or handling user function thread interrupt that we can make to improve our chances of clean shutdown, but i think it should never be able to reach a state of RUNNING/DRAINING when the critical control loop threads and futures are terminated. maybe it can do execectorservice.shutdownNow() after the grace period and give up on unresponsive threads and go to CLOSED state or something like that?

@gtassone
Copy link
Contributor

and probably if BrokerPollSystem.controlThreadFuture=Completed Exceptionally that needs to propagate thru the control mechanisms and change the processor state

@gtassone
Copy link
Contributor

to sum up -

  1. doClose() is not executed safely (within a try block) during try { controlLoop() } catch exception handling
  2. doClose() does not transition to CLOSED state until the end of the method.
  3. any unexpected exception during doClose() will prevent the state transition to CLOSED while terminating the controlLoop task. This leaves state = RUNNING and controlTask -> terminated with unhandled exception
  4. above circumstance results in the external system unable to monitor accurate state of terminated ParallelEosStreamProcessor
  5. similar problem with brokerPollSystem control thread
  6. above circumstance is likely - multiple sequence of kafka client exceptions or timeout exceptions - with kafka client connectivity failure or if broker dropped consumer from the group

what should happen -

  1. PC processor should ensure accurate state transitions even when unexpected exceptions occur
  2. top level exception handling for controlTask - any otherwise unhandled exception on teh control thread should be caught and result in a state transition, either CLOSED or introduce a DEGRADED state. this should be discoverable by external system via isClosedOrFailed() or otherwise. rethrow after state transition if you want.
  3. same for brokerPollSystem

@gtassone
Copy link
Contributor

@gtassone
Copy link
Contributor

here's some logs from fix - note multiple sequence of kafka client exceptions that would have caused doClose() to abort

message | Unknown error
logger_name | io.confluent.parallelconsumer.internal.BrokerPollSystem
stack_trace
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303)
...
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.maybeDoCommit(BrokerPollSystem.java:357)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:138)


message | Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=bbc39e55-f0c2-4898-8c44-f59ce23f6eda, requestedAtMs=1722011062299)
logger_name | io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
stack_trace
io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=bbc39e55-f0c2-4898-8c44-f59ce23f6eda, requestedAtMs=1722011062299)
...
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1282)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:864)

message | failed to commit during close sequence
logger_name | io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
stack_trace
io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=47fd9040-1c16-4db3-85cc-ca8772b0fa24, requestedAtMs=1722011122313)
...
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1282)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.innerDoClose(AbstractParallelEoSStreamProcessor.java:699)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:633)

message | Execution or timeout exception waiting for broker poller thread to finish
logger_name | io.confluent.parallelconsumer.internal.BrokerPollSystem
stack_trace
java.util.concurrent.ExecutionException: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
...
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:277)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.innerDoClose(AbstractParallelEoSStreamProcessor.java:706)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:633)

message | failed to close brokerPollSubsystem during close sequence
logger_name | io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
stack_trace
java.util.concurrent.ExecutionException: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
...
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:277)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.innerDoClose(AbstractParallelEoSStreamProcessor.java:706)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:633)

message | PC closed due to error: java.lang.RuntimeException: Error from poll control thread: Timeout waiting for commit response PT30S to request 
logger_name | io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
thread_name | pc-control

@gtassone
Copy link
Contributor

gtassone commented Aug 8, 2024

@rkolesnev i opened that as a PR - feel free to take over that or suggest any alternative #818

@rkolesnev
Copy link
Member

Thank you @gtassone - I have started to map out a more deeper fix - but i am really struggling with time - i think the suggested PR is a good stop-gap solution to at least let it close completely and avoid getting stuck.
I have left couple of minor comments / change suggestions on the PR.

gtassone added a commit to gtassone/parallel-consumer that referenced this issue Aug 13, 2024
Fixes confluentinc#809

Co-authored-by: Roman Kolesnev <romankolesnev@gmail.com>
@gtassone
Copy link
Contributor

@rkolesnev - updated PR - addressed comments, squash rebased, made my github user as author. lmk any additional changes needed.

@rkolesnev rkolesnev reopened this Aug 14, 2024
@rkolesnev
Copy link
Member

rkolesnev commented Aug 14, 2024

I will keep the issue open for further work on it - but the PR #818 is merged.
Thank you @gtassone.

@gtassone
Copy link
Contributor

thanks @rkolesnev - how does your release process work? can we expect a patch release for this feature?

@rkolesnev
Copy link
Member

Seems it got closed again automatically/incorrectly due to PR hooks - reopening.

@rkolesnev rkolesnev reopened this Aug 16, 2024
@rkolesnev
Copy link
Member

rkolesnev commented Aug 16, 2024

@gtassone - yep - i think it is worthwhile to cut a release with this fix - @wushilin is working on another angle for same issue for Sync Commits - to add option to retry them - as those are Sync commits and processing is halted - i think it would be nice to have an option to do so.
Once its done / reviewed / merged - will cut a release - i would guess roughly towards end of next / start of following week.

Release process is pretty informal - we try to bundle enhancements / fixes / dependency updates into release - so not doing daily / weekly etc build - but for important fixes or enhancements - would do the release sooner rather than later - process is fairly automated.

skl83 pushed a commit to skl83/parallel-consumer that referenced this issue Sep 2, 2024
@rkolesnev
Copy link
Member

@gtassone - this is released now in 0.5.3.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants