Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer receives eachBatch over the maxBytes #1694

Open
akospaska opened this issue Jun 7, 2024 · 1 comment
Open

Consumer receives eachBatch over the maxBytes #1694

akospaska opened this issue Jun 7, 2024 · 1 comment

Comments

@akospaska
Copy link

Describe the bug
The consumer fetches messages over the maxBytes settings

To Reproduce

Consumer setup:

const consumer = kafka.consumer({ groupId: "my-group", maxBytes: 2500 });

  await consumer.run({
    eachBatch: async ({ batch }) => {
      console.log(`Received batch with ${batch.messages.length} messages`);
    },
  });

Producer setup:

Here are the messages what i want to send to the topic.
Each message is arround 1017 bytes.

const messages = [
  { value: { id: 21, name: "a".repeat(1000) } },
  { value: { id: 22, name: "a".repeat(1000) } },
  { value: { id: 23, name: "a".repeat(1000) } },
  { value: { id: 24, name: "a".repeat(1000) } },
  { value: { id: 25, name: "a".repeat(1000) } },
  { value: { id: 26, name: "a".repeat(1000) } },
  { value: { id: 27, name: "a".repeat(1000) } },
  { value: { id: 28, name: "a".repeat(1000) } },
  { value: { id: 29, name: "a".repeat(1000) } },
  { value: { id: 210, name: "a".repeat(1000) } },
].map((msg) => ({
  value: JSON.stringify(msg.value),
}));

Expected behavior:
The consumer should receive the batches under 2500 bytes, so at maximum 2 messages in each batch.

Producer1 setup:

await producer.send({
  topic: "example-created-batch",
  messages,
});

In this case the consumer receives all the messages in once, so 10 messages in the batch.

Producer2 setup:

messages.map((message) =>
  producer.send({
    topic: "example-created-batch",
    messages: [message],
  })
);

This solution sends the messages individually.
In this case happens exactly what I want, In every batch are only 2 messages.

What am I missing?

Is there a way to send the messages in one request, but receives the batches till the maxBytes batch size?

@sauravkr20
Copy link

i am having the same issue in the eachBatch , consumer setup i have
configs to be


const consumer = this.kafkaInstance.kafka.consumer({
			// one webhook message is about 350-450 bytes
			// so it will wait 3 sec until there is more than one message, and will take up max of 4000 bytes in a batch
			groupId: group,
			maxBytesPerPartition: 1,
			minBytes: 400,
			maxWaitTimeInMs: 3000,
			retry: {
				retries: 5,
				restartOnFailure: async (err: Error) => {
					SlackAlertService.getInstance().notify(
						SlackChannels.PRIORITY_ERRORS,
						'kafka Consumer shut down due to max retry, restart manually after resolving',
						this.defaultLogger,
					);
					this.defaultLogger.error('Kafka Consumer Failure, restart manually after resolving: ', err);
					return false;
				},
			},
		});

but i am getting batches with number of message and total sizes

Batch size: 1250, Byte size: 2156251 bytes
Batch size: 312, Byte size: 537889 bytes
Batch size: 157, Byte size: 270669 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 312, Byte size: 537889 bytes
Batch size: 156, Byte size: 268945 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 156, Byte size: 268945 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 313, Byte size: 539613 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 625, Byte size: 1077501 bytes
Batch size: 156, Byte size: 268945 bytes
Batch size: 156, Byte size: 268945 bytes
Batch size: 156, Byte size: 268945 bytes
Batch size: 156, Byte size: 268945 bytes
Batch size: 625, Byte size: 1077501 bytes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants