Custom Producer
#####################################-
In VMWare Workstation -> Kafka VM
Start Zookeeper
Start Broker
Create topic "MyFirstTopic"
Start consumer with that topic "MyFirstTopic"
Open Eclipse (create Custom Producer and Run)
-----------------------------------------------
-> give workspace
-> Create normal Java Project "KafkaProject"
-> Create java class -> SimpleProducer.java
-> take some java files from -> workspaceKafka-20200110T040943Z-001\workspaceKafka\MyKAfkaProject\src\main\java\KafkaCode
-> copy SimpleProducer.java to your eclipse SimpleProducer.java
-> Make sure in code : topic name and Port
-> Run java Application
-> Verify in consumer terminal whether value is appearing or not
//we should see "Value-2" in consumer terminal
Kafka Cluster
-------------------
1 Node - 1 Broker
1 Node - Multiple brokers
Multiple Node - Multiple Brokerss
We will try - 1 Node - Multiple brokers
#####################################-
conf/server.properties
Broker-id (broker.id=0)
log-dir log.dirs=/tmp/kafka-logs
ip:port (listeners=PLAINTEXT://:9092)
conf/server1.properties
Broker-id (broker.id=1)
log-dir log.dirs=/tmp/kafka-logs2
ip:port (listeners=PLAINTEXT://:9093)
conf/server2.properties
Broker-id (broker.id=2)
log-dir log.dirs=/tmp/kafka-logs3
ip:port (listeners=PLAINTEXT://:9094)
------------------------------------------------------------
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties
//create topic with replication factor - 3 (bez there are 3 brokers running it can replicate in all 3)
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic NewTopic --partitions 5 --replication-factor 3
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic NewTopic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewTopic
-----------------------------------------------------------
Producer Flow (SimpleProcedure.java)
#####################
Serializer - based on key_value
we can mention which Serializer to do serialization (we use kafka serializer)
- convert to Byte Arrray
Producer(props)
ProducerRecord(topicName, key, value)
send(ProdRec)
//even we can pass Partition# in constructor
//parttion# where it has to go will be decided based on key (key.hashCode)
key.hashcode * no of partition
//Future object -SynchronousProducer.java (get())
#####################################-
-> get() -> returns Future object wchich contains metadata of msg which partition it went in...
metadata.partition()
metadata.offset()
-----------------------------------------------
Key is : Key1
Message is sent to Partition no 4 and offset 1
SynchronousProducer Completed with success.
----------------------------------------------
Change the key -> so that it will got to different partition, bez which partition decides based on key
//changed key to -> String key = "Key12345";
-----------------------------------------------
Key is : Key12345
Message is sent to Partition no 0 and offset 0
SynchronousProducer Completed with success.
----------------------------------------------
//Custom Serializer -
#####################################-
Supplier.java
SupplierSerializer.java
SimpleProducer.java
SupplierDeserializer.java
SupplierConsumer.java
//create topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic SupplierTopic --partitions 5 --replication-factor 3
-> Eclipse -> Run SupplierConsumer.java
-> Eclipse -> Run SimpleProducer.java
---------------------------------------------------------
Supplier id= 101 Supplier Name = Xyz Pvt Ltd. Supplier Start Date = Fri Apr 01 00:00:00 EDT 2016
Supplier id= 102 Supplier Name = Abc Pvt Ltd. Supplier Start Date = Sun Jan 01 00:00:00 EST 2012
---------------------------------------------------------
$$$$$$$$$$ IMAGE
//Custom Partitioner -
//API - Partitioner
#####################################-
1. //Default Partitioner - (Topic, V) => Based on Round robin msg will go to partition in broker
2. Topic, K, V
k.hashCode() % no of Partition
3. Topic, k, V, P#
4. Custom partition
Eg: if input has more text as "AAA" (40% of overall data)
if we go with 3rd option - all "AAA" will go to only that partition and it will be havier (40% of over all data)
So we go with custom logic to then we can tell "AAA" can go to to 3 partitions instead of 1
Custom partition
#####################################-
SensorPartitioner.java
SensorProducer.java
//create topic with 10 partitions
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic SensorTopic --partitions 10 --replication-factor 3
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SensorTopic
-> Eclipse -> Run SensorProducer.java
Zookeeper Monitor
#####################################-
bin/zookeeper-shell.sh localhost:2181
//You will see : WatchedEvent state:SyncConnected type:None path:null
//type below ls commands to check ids, topics, brokers etc...
ls /brokers/ids
[0, 1, 2]
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[NewTopic, SensorTopic, SupplierTopic, MyFirstTopic, __consumer_offsets]
ls /brokers/topics/NewTopic
[partitions]
ls /brokers/topics/NewTopic/partitions
[0, 1, 2, 3, 4]
Consumer
##########################
- mandatory to assign Consumer to Consumer Group (to increase the scalablity)
- By default consumer gets data from all partitions.
- If partion is removed or Consumer is added to group => Activity will be triggerd called => Rebalancing with in single Consumer Group
Rebalancing => algorithm called - "partitionAssigner" (RoundRobin, Range)
After Rebalancing -> partitions may go to different consumer which it was working earlier
- 2 consumer with in same consumer group operate on single Partition (eg: P0)-> is not possible, but other consumer from other consumer group can operate
Group Coordinator
----------------
Kafka side : 1 broker for 1 consumer group will be - Group leader
Consumer side : from each consumer group 1 consumer acts like Groupo leader, usually 1st consumer
From consumer group - all consumer will ping Broker (group leader) in kafka
Group Leader from consumer group will trigger "PartitionAssigner"
Broker (Group Leader) => ask for Rebalancer, bez Broker gets the heartbeats from each consumer, if earlier consumers in consumer group is 3, and if we add 1 more consumer in cosnumer group then heart beat it gets is 4, so it initiates Rebalancing => Then Group Leader decides which consumer works on which partition
Offset
-------
Current offset: current consumer should not repeat same msg, for that we manage current offset
Committed offset:After rebalancing new consumer should not read message again so committed offset maintains.
1. Auto commit => ms : 5 sec (every after 5 sec it does autocommit)
2. Manual commit
50 ------------60
| |
assume 6 from 50 - 60 is processed and Rebalncing happens the 6 is already read and once new consumer comes then it will start from 50
For this we can have RebalanceListner -> and once Rebalance triggers -> write what ever read data (6 msg) -> commit
onPartitionsAssigned(), onPartitionsRevoked()
RebalanceListner with Consumer
###############################################################
//Create 10 Partitions - RandomProducerTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic RandomProducerTopic --partitions 10 --replication-factor 3
Eclipse -> Run -> RandomConsumer.java
You will see all 10 partitions printing
Eclipse -> Run -> RandomConsumer.java (Another instance)
You will see all 5 partitions printing (Once New consumer is added, RebalanceListner will trigger and assign 5 partition to consumer1 and 5 partitions to consumer2
Following Partitions Revoked ....
Following Partitions commited ....
Following Partitions Assigned ....
3,
2,
1,
0,
4,
Eclipse -> Run -> RandomProducer.java
//it will send data contineously...
Cluster Mirroring
#############################
- Cluster1 - Cluster2 (copy of cluster1)
3 Options we can do mirroring
-------------------------------
1. from 3 kafka cluster -> pour to central cluster for analysis
2. exact copy of another
3. /bin/kafka-mirroring.sh
create copy zookeeper.properties -> zookeeper1.properties
clientPort=2182
create copy of server.properties -> serverzoo2.properties
listeners=PLAINTEXT://:9095
zookeeper.connect=localhost:2182
cd kafka_2.12-2.2.1
bin/zookeeper-server-start.sh config/zookeeper1.properties
bin/kafka-server-start.sh config/serverzoo2.properties
bin/kafka-topics.sh --zookeeper localhost:2182 --create --topic MyNewClusterTopic --partitions 3 --replication-factor 1
bin/kafka-topics.sh --zookeeper localhost:2182 --list
bin/zookeeper-shell.sh localhost:2182
ls /brokers/ids
ls /brokers
ls /brokers/topics
No comments:
Post a Comment