Saturday, July 18, 2020

15. KAFKA : Kafka Theory More...

15. KAFKA : Kafka Theory More...















Custom Producer
#####################################-
In VMWare Workstation -> Kafka VM
Start Zookeeper
Start Broker
Create topic "MyFirstTopic"
Start consumer with that topic "MyFirstTopic"

Open Eclipse  (create Custom Producer and Run)
-----------------------------------------------
-> give workspace
-> Create normal Java Project "KafkaProject"
-> Create java class -> SimpleProducer.java
-> take some java files from -> workspaceKafka-20200110T040943Z-001\workspaceKafka\MyKAfkaProject\src\main\java\KafkaCode
-> copy SimpleProducer.java to your eclipse SimpleProducer.java
-> Make sure in code : topic name and Port
-> Run java Application
-> Verify in consumer terminal whether value is appearing or not
    //we should see "Value-2" in consumer terminal



Kafka Cluster
-------------------

1 Node - 1 Broker

1 Node - Multiple brokers

Multiple Node - Multiple Brokerss


We will try - 1 Node - Multiple brokers
#####################################-
conf/server.properties
    Broker-id   (broker.id=0)
    log-dir     log.dirs=/tmp/kafka-logs
    ip:port     (listeners=PLAINTEXT://:9092)

conf/server1.properties
    Broker-id  (broker.id=1)
    log-dir     log.dirs=/tmp/kafka-logs2
    ip:port    (listeners=PLAINTEXT://:9093)

conf/server2.properties
    Broker-id  (broker.id=2)
    log-dir     log.dirs=/tmp/kafka-logs3
    ip:port    (listeners=PLAINTEXT://:9094)

------------------------------------------------------------
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties

//create topic with replication factor - 3 (bez there are 3 brokers running it can replicate in all 3)
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic NewTopic --partitions 5 --replication-factor 3

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic NewTopic

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewTopic


-----------------------------------------------------------


Producer Flow (SimpleProcedure.java)
#####################

Serializer - based on key_value
    we can mention which Serializer to do serialization (we use kafka serializer)
    - convert to Byte Arrray


Producer(props)
ProducerRecord(topicName, key, value)
send(ProdRec)

//even we can pass Partition# in constructor
//parttion# where it has to go will be decided based on key  (key.hashCode)
    key.hashcode * no of partition


//Future object -SynchronousProducer.java (get())
#####################################-
 -> get() -> returns Future object wchich contains metadata of msg which partition it went in...
    metadata.partition()
    metadata.offset()

    -----------------------------------------------
    Key is : Key1
    Message is sent to Partition no 4 and offset 1
    SynchronousProducer Completed with success.
    ----------------------------------------------

Change the key -> so that it will got to different partition, bez which partition decides based on key

    //changed key to -> String key = "Key12345";

    -----------------------------------------------
    Key is : Key12345
    Message is sent to Partition no 0 and offset 0
    SynchronousProducer Completed with success.
    ----------------------------------------------


//Custom Serializer -
#####################################-
Supplier.java
SupplierSerializer.java
SimpleProducer.java

SupplierDeserializer.java
SupplierConsumer.java

//create topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic SupplierTopic --partitions 5 --replication-factor 3


-> Eclipse -> Run SupplierConsumer.java
-> Eclipse -> Run SimpleProducer.java

        ---------------------------------------------------------
        Supplier id= 101 Supplier  Name = Xyz Pvt Ltd. Supplier Start Date = Fri Apr 01 00:00:00 EDT 2016
        Supplier id= 102 Supplier  Name = Abc Pvt Ltd. Supplier Start Date = Sun Jan 01 00:00:00 EST 2012
        ---------------------------------------------------------

$$$$$$$$$$ IMAGE


//Custom Partitioner -
//API - Partitioner

#####################################-
1. //Default Partitioner - (Topic, V) => Based on Round robin msg will go to partition in broker

2. Topic, K, V
    k.hashCode() % no of Partition

3. Topic, k, V, P#

4. Custom partition
    Eg: if input has more text as "AAA" (40% of overall data)
    if we go with 3rd option - all "AAA" will go to only that partition and it will be havier (40% of over all data)
    So we go with custom logic to then we can tell "AAA" can go to to 3 partitions instead of 1



Custom partition
#####################################-
SensorPartitioner.java
SensorProducer.java


//create topic with 10 partitions
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic SensorTopic --partitions 10 --replication-factor 3

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SensorTopic

-> Eclipse -> Run SensorProducer.java


Zookeeper Monitor
#####################################-
bin/zookeeper-shell.sh localhost:2181

//You will see : WatchedEvent state:SyncConnected type:None path:null
//type below ls commands to check ids, topics, brokers etc...
ls /brokers/ids
    [0, 1, 2]
ls /brokers
    [ids, topics, seqid]
ls /brokers/topics
    [NewTopic, SensorTopic, SupplierTopic, MyFirstTopic, __consumer_offsets]
ls /brokers/topics/NewTopic
    [partitions]
ls /brokers/topics/NewTopic/partitions
    [0, 1, 2, 3, 4]


Consumer
##########################
- mandatory to assign Consumer to Consumer Group (to increase the scalablity)
- By default consumer gets data from all partitions.
- If partion is removed or Consumer is added to group => Activity will be triggerd called => Rebalancing with in single Consumer Group
    Rebalancing => algorithm called - "partitionAssigner" (RoundRobin, Range)
    After Rebalancing -> partitions may go to different consumer which it was working earlier
- 2 consumer with in same consumer group operate on single Partition (eg: P0)-> is not possible, but other consumer from other consumer group can operate

 Group Coordinator
----------------
Kafka side : 1 broker for 1 consumer group will be - Group leader
Consumer side : from each consumer group 1 consumer acts like Groupo leader, usually 1st consumer

From consumer group - all consumer will ping Broker (group leader) in kafka
Group Leader from consumer group will trigger "PartitionAssigner"
Broker (Group Leader) => ask for Rebalancer, bez Broker gets the heartbeats from each consumer, if earlier consumers in consumer group is 3, and if we add 1 more consumer in cosnumer group then heart beat it gets is 4, so it initiates Rebalancing => Then Group Leader decides which consumer works on which partition

Offset
-------
Current offset: current consumer should not repeat same msg, for that we manage current offset

Committed offset:After rebalancing new consumer should not read message again so committed offset maintains.

1. Auto commit  => ms : 5 sec  (every after 5 sec it does autocommit)
2. Manual commit
        50 ------------60
        |        |
       
    assume 6 from 50 - 60 is processed and Rebalncing happens the 6 is already read and once new consumer comes then it will start from 50
    For this we can have RebalanceListner -> and once Rebalance triggers -> write what ever read data (6 msg) -> commit
                onPartitionsAssigned(), onPartitionsRevoked()

RebalanceListner with Consumer
###############################################################
//Create 10 Partitions - RandomProducerTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic RandomProducerTopic --partitions 10 --replication-factor 3


Eclipse -> Run -> RandomConsumer.java
    You will see all 10 partitions printing

Eclipse -> Run -> RandomConsumer.java (Another instance)
    You will see all 5 partitions printing    (Once New consumer is added, RebalanceListner will trigger and assign 5 partition to consumer1 and 5 partitions to consumer2

        Following Partitions Revoked ....
        Following Partitions commited ....
        Following Partitions Assigned ....
        3,
        2,
        1,
        0,
        4,

Eclipse -> Run -> RandomProducer.java
        //it will send data contineously...
       

Cluster Mirroring
#############################
- Cluster1 - Cluster2 (copy of cluster1)

3 Options we can do mirroring
-------------------------------
1. from 3 kafka cluster -> pour to central cluster for analysis
2. exact copy of another
3. /bin/kafka-mirroring.sh


create copy zookeeper.properties -> zookeeper1.properties
    clientPort=2182

create copy of server.properties  -> serverzoo2.properties
    listeners=PLAINTEXT://:9095
    zookeeper.connect=localhost:2182

cd kafka_2.12-2.2.1
bin/zookeeper-server-start.sh config/zookeeper1.properties
bin/kafka-server-start.sh config/serverzoo2.properties

bin/kafka-topics.sh --zookeeper localhost:2182 --create --topic MyNewClusterTopic --partitions 3 --replication-factor 1

bin/kafka-topics.sh --zookeeper localhost:2182 --list
bin/zookeeper-shell.sh localhost:2182
    ls /brokers/ids
    ls /brokers
    ls /brokers/topics

No comments:

Post a Comment