Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add a new ack timeout mode where a callback is called instead of nacking the message #23262

Open
2 tasks done
lhotari opened this issue Sep 5, 2024 · 2 comments
Open
2 tasks done
Assignees
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Milestone

Comments

@lhotari
Copy link
Member

lhotari commented Sep 5, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

When a Pulsar application has ordered processing requirements, it's necessary to use either Failover, Exclusive or Key_Shared subscriptions. Ack timeouts shouldn't be used at all since this could cause messages to be processed in the wrong order. The application should take responsibility of handling possible error cases.

Since ack timeouts aren't used, there's a chance that the application logic contains a bug and the application doesn't acknowledge a message. Detecting this is very hard currently. It's hard to tell whether lost acks are caused by a Pulsar bug or feature or it's a problem caused by the application. This should be made easier.

Solution

Since Pulsar already contains the ack timeout feature, it would be natural to use it as the basis for detecting when the application is not acknowledging a message in time.

example of configuring the ack timeout handler:

consumerBuilder.ackTimeout(10, TimeUnit.SECOND)
               .ackTimeoutHandler((consumer, messageIds) -> messageIds.forEach(messageId -> log.warn("message with id {} wasn't acknowledged!", messageId))

interface:

import java.util.Set;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;

public interface AckHandler {
    void handleAckTimeout(Consumer<?> consumer, Set<MessageId> messageIds);
}

Alternatives

Anything else?

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Sep 5, 2024
@lhotari lhotari added this to the 4.0.0 milestone Sep 5, 2024
@lhotari lhotari self-assigned this Sep 5, 2024
@crossoverJie
Copy link
Member

apache/pulsar-client-go#403 (comment)
apache/pulsar-client-go#217 (comment)

I have a doubt. Based on the discussion here, ackTimout is a legacy feature; do we still need to add new functionalities for this feature?

@lhotari
Copy link
Member Author

lhotari commented Sep 9, 2024

apache/pulsar-client-go#403 (comment) apache/pulsar-client-go#217 (comment)

I have a doubt. Based on the discussion here, ackTimout is a legacy feature; do we still need to add new functionalities for this feature?

@crossoverJie Thanks for sharing. Perhaps we rename this completely. :) However, it doesn't take away the usefulness of having a way to detect when the application is possibly misbehaving and missing to acknowledge a message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

2 participants