11. KAFKA : Sample Program9 : Consumer : RebalanceListner with Consumer
Consumer
##########################
- mandatory to assign Consumer to Consumer Group (to increase the scalablity)
- By default consumer gets data from all partitions.
- If partion is removed or Consumer is added to group => Activity will be triggerd called => Rebalancing with in single Consumer Group
Rebalancing => algorithm called - "partitionAssigner" (RoundRobin, Range)
After Rebalancing -> partitions may go to different consumer which it was working earlier
- 2 consumer with in same consumer group operate on single Partition (eg: P0)-> is not possible, but other consumer from other consumer group can operate
Group Coordinator
----------------
Kafka side : 1 broker for 1 consumer group will be - Group leader
Consumer side : from each consumer group 1 consumer acts like Groupo leader, usually 1st consumer
From consumer group - all consumer will ping Broker (group leader) in kafka
Group Leader from consumer group will trigger "PartitionAssigner"
Broker (Group Leader) => ask for Rebalancer, bez Broker gets the heartbeats from each consumer, if earlier consumers in consumer group is 3, and if we add 1 more consumer in cosnumer group then heart beat it gets is 4, so it initiates Rebalancing => Then Group Leader decides which consumer works on which partition
Offset
-------
Current offset: current consumer should not repeat same msg, for that we manage current offset
Committed offset:After rebalancing new consumer should not read message again so committed offset maintains.
1. Auto commit => ms : 5 sec (every after 5 sec it does autocommit)
2. Manual commit
50 ------------60
| |
assume 6 from 50 - 60 is processed and Rebalncing happens the 6 is already read and once new consumer comes then it will start from 50
For this we can have RebalanceListner -> and once Rebalance triggers -> write what ever read data (6 msg) -> commit
onPartitionsAssigned(), onPartitionsRevoked()
RebalanceListner with Consumer
###############################################################
//Create 10 Partitions - RandomProducerTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic RandomProducerTopic --partitions 10 --replication-factor 3
Eclipse -> Run -> RandomConsumer.java
You will see all 10 partitions printing
Eclipse -> Run -> RandomConsumer.java (Another instance)
You will see all 5 partitions printing (Once New consumer is added, RebalanceListner will trigger and assign 5 partition to consumer1 and 5 partitions to consumer2
Following Partitions Revoked ....
Following Partitions commited ....
Following Partitions Assigned ....
3,
2,
1,
0,
4,
Eclipse -> Run -> RandomProducer.java
//it will send data contineously...
No comments:
Post a Comment