Skip to content

Latest commit

 

History

History
34 lines (25 loc) · 2.32 KB

File metadata and controls

34 lines (25 loc) · 2.32 KB

Custom Partition Message Producer

Demonstrates how to dispatch message to Kafka Topic using custom partition assignment.

Listing messages from the topic

To verify if the message are dispatched to different partitions, execute PartitionMessageConsumer class.

2017-01-09 08:45:02,450 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [jobfeeds-1, jobfeeds-0, jobfeeds-9, jobfeeds-8, jobfeeds-7, jobfeeds-6, jobfeeds-5, jobfeeds-4, jobfeeds-3, jobfeeds-2] for group CountryGroup
2017-01-09 08:45:06,365 INFO  io.github.aparnachaudhary.PartitionMessageConsumer - RecordMetadata: Topic: jobfeeds Partition: 3 Offset: 14 Key: DE Value: Hello World Mon Jan 09 08:45:06 CET 2017
2017-01-09 08:45:06,365 INFO  io.github.aparnachaudhary.PartitionMessageConsumer - RecordMetadata: Topic: jobfeeds Partition: 1 Offset: 14 Key: NL Value: Hello World Mon Jan 09 08:45:06 CET 2017
2017-01-09 08:45:06,365 INFO  io.github.aparnachaudhary.PartitionMessageConsumer - RecordMetadata: Topic: jobfeeds Partition: 6 Offset: 3 Key: IN Value: Hello World Mon Jan 09 08:45:06 CET 2017
2017-01-09 08:45:06,365 INFO  io.github.aparnachaudhary.PartitionMessageConsumer - RecordMetadata: Topic: jobfeeds Partition: 5 Offset: 14 Key: AT Value: Hello World Mon Jan 09 08:45:06 CET 2017
2017-01-09 08:45:06,365 INFO  io.github.aparnachaudhary.PartitionMessageConsumer - RecordMetadata: Topic: jobfeeds Partition: 4 Offset: 14 Key: FR Value: Hello World Mon Jan 09 08:45:06 CET 2017
2017-01-09 08:45:06,365 INFO  io.github.aparnachaudhary.PartitionMessageConsumer - RecordMetadata: Topic: jobfeeds Partition: 2 Offset: 12 Key: BE Value: Hello World Mon Jan 09 08:45:06 CET 2017

To verify how partitions are assignment to different consumers, start multiple instances of PartitionMessageConsumer.

Consumer 1 - re-balancing:

2017-01-09 08:46:23,934 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [jobfeeds-0, jobfeeds-8, jobfeeds-6, jobfeeds-4, jobfeeds-2] for group CountryGroup

Consumer 2 - assignment:

2017-01-09 08:46:23,944 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [jobfeeds-1, jobfeeds-9, jobfeeds-7, jobfeeds-5, jobfeeds-3] for group CountryGroup