Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re Consuming KafkaJs #1697

Open
MAhmadIqbal opened this issue Jun 14, 2024 · 0 comments
Open

Re Consuming KafkaJs #1697

MAhmadIqbal opened this issue Jun 14, 2024 · 0 comments

Comments

@MAhmadIqbal
Copy link

Need to Re-consume the commit offset messages
Actually, last time I have faced the issue consumer consumed the messages twice after certain period of time, so I have added unique param in payload in my producers to check, is the message already consumed or not.
Now I need to re consume the last offset messages but seek not working but facing an issue, it says offset is out of range.
even I get the offset from the same consumer, and want to start from last 50 commit messages. those offset are available in confluent but why I am facing the issue.

const ConsumerReminderExpertSms = async ({ topic = "test1" }) => {
  try {
  
    const consumerReminder = expertKafka.consumer({
      groupId: `Reminder22_Expert_Sms_${ENV}`,
      
  });
    await consumerReminder.connect();
    const specificOffsets = {
      0: '4950', // Offset for partition 0
      1: '2450'  // Offset for partition 1
    };

    await consumerReminder.subscribe({ topic: topic, fromBeginning:false, });

    consumerReminder.on(consumerReminder.events.GROUP_JOIN, async () => {
      // Seek to the specific offset after joining the group
      try{
        for (const [partition, offset] of Object.entries(specificOffsets)) {
        await consumerReminder.seek({ topic, partition: Number(partition), offset:offset });
        console.log(`Seeking to offset ${offset} on partition ${partition} for topic ${topic}`);
      }
    }catch(err){
      console.log("SEEK_ERROR",err)
    }
    });

    consumerReminder.run({
      autoCommit:false,

      eachMessage: async ({ topic, partition, message }) => {
        try{

Environment:

  • OS: [Window 11]
  • KafkaJS version [2.2.4]
  • NodeJS version [e.g. 18]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant