Tuesday, January 14, 2020

13. KAFKA : Sample Program11 : Cluster Mirroring

13. KAFKA : Sample Program11 : Cluster Mirroring


Cluster Mirroring
#############################
- Cluster1 - Cluster2 (copy of cluster1)

3 Options we can do mirroring
-------------------------------
1. from 3 kafka cluster -> pour to central cluster for analysis
2. exact copy of another
3. /bin/kafka-mirroring.sh


create copy zookeeper.properties -> zookeeper1.properties
clientPort=2182

create copy of server.properties  -> serverzoo2.properties
listeners=PLAINTEXT://:9095
zookeeper.connect=localhost:2182

cd kafka_2.12-2.2.1
bin/zookeeper-server-start.sh config/zookeeper1.properties
bin/kafka-server-start.sh config/serverzoo2.properties

bin/kafka-topics.sh --zookeeper localhost:2182 --create --topic MyNewClusterTopic --partitions 3 --replication-factor 1

bin/kafka-topics.sh --zookeeper localhost:2182 --list 
bin/zookeeper-shell.sh localhost:2182
ls /brokers/ids
ls /brokers
ls /brokers/topics



12. KAFKA : Sample Program10 : Zookeeper Monitor

12. KAFKA : Sample Program10 : Zookeeper Monitor


Zookeeper Monitor
#####################################-
bin/zookeeper-shell.sh localhost:2181

//You will see : WatchedEvent state:SyncConnected type:None path:null
//type below ls commands to check ids, topics, brokers etc...
ls /brokers/ids
[0, 1, 2]
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[NewTopic, SensorTopic, SupplierTopic, MyFirstTopic, __consumer_offsets]
ls /brokers/topics/NewTopic
[partitions]
ls /brokers/topics/NewTopic/partitions
[0, 1, 2, 3, 4]



11. KAFKA : Sample Program9 : Consumer : RebalanceListner with Consumer

11. KAFKA : Sample Program9 : Consumer : RebalanceListner with Consumer


Consumer
##########################
- mandatory to assign Consumer to Consumer Group (to increase the scalablity)
- By default consumer gets data from all partitions.
- If partion is removed or Consumer is added to group => Activity will be triggerd called => Rebalancing with in single Consumer Group
Rebalancing => algorithm called - "partitionAssigner" (RoundRobin, Range)
After Rebalancing -> partitions may go to different consumer which it was working earlier
- 2 consumer with in same consumer group operate on single Partition (eg: P0)-> is not possible, but other consumer from other consumer group can operate

 Group Coordinator
----------------
Kafka side : 1 broker for 1 consumer group will be - Group leader
Consumer side : from each consumer group 1 consumer acts like Groupo leader, usually 1st consumer

From consumer group - all consumer will ping Broker (group leader) in kafka
Group Leader from consumer group will trigger "PartitionAssigner"
Broker (Group Leader) => ask for Rebalancer, bez Broker gets the heartbeats from each consumer, if earlier consumers in consumer group is 3, and if we add 1 more consumer in cosnumer group then heart beat it gets is 4, so it initiates Rebalancing => Then Group Leader decides which consumer works on which partition

Offset
-------
Current offset: current consumer should not repeat same msg, for that we manage current offset

Committed offset:After rebalancing new consumer should not read message again so committed offset maintains.

1. Auto commit  => ms : 5 sec  (every after 5 sec it does autocommit)
2. Manual commit
50 ------------60
| |

assume 6 from 50 - 60 is processed and Rebalncing happens the 6 is already read and once new consumer comes then it will start from 50
For this we can have RebalanceListner -> and once Rebalance triggers -> write what ever read data (6 msg) -> commit
onPartitionsAssigned(), onPartitionsRevoked()


RebalanceListner with Consumer
###############################################################
//Create 10 Partitions - RandomProducerTopic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic RandomProducerTopic --partitions 10 --replication-factor 3


Eclipse -> Run -> RandomConsumer.java
You will see all 10 partitions printing

Eclipse -> Run -> RandomConsumer.java (Another instance)
You will see all 5 partitions printing (Once New consumer is added, RebalanceListner will trigger and assign 5 partition to consumer1 and 5 partitions to consumer2

Following Partitions Revoked ....
Following Partitions commited ....
Following Partitions Assigned ....
3,
2,
1,
0,
4,

Eclipse -> Run -> RandomProducer.java
//it will send data contineously...














10. KAFKA : Sample Program8 : Consumer : Manual Consumer

10. KAFKA : Sample Program8 : Consumer : Manual Consumer




package KafkaCode;


import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ManualConsumer {
public static void main(String[] args) throws Exception{

        String topicName = "SupplierTopic";
        String groupName = "SupplierTopicGroup";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "SupplierDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, Supplier> consumer = null;

        try {
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));

            while (true){
                @SuppressWarnings("deprecation")
ConsumerRecords<String, Supplier> records = consumer.poll(100);
                for (ConsumerRecord<String, Supplier> record : records){
                    System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + " Supplier  Name = " + record.value().getName() + " Supplier Start Date = " + record.value().getStartDate().toString());
                }
                consumer.commitAsync();
            }
        }catch(Exception ex){
            ex.printStackTrace();
        }finally{
            consumer.commitSync();
            consumer.close();
        }
    }
}




9. KAFKA : Sample Program7 : With Custom partition

9. KAFKA : Sample Program7 : With Custom partition


Custom partition
#####################################-
SensorPartitioner.java
SensorProducer.java


//create topic with 10 partitions
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic SensorTopic --partitions 10 --replication-factor 3

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SensorTopic

-> Eclipse -> Run SensorProducer.java

###########################################################################
package KafkaCode;

import java.util.*;

import org.apache.kafka.common.*;
import org.apache.kafka.common.utils.*;
import org.apache.kafka.common.record.*;
import org.apache.kafka.clients.producer.Partitioner;

public class SensorPartitioner implements Partitioner {
private String speedSensorName;

    public void configure(Map<String, ?> configs) {
         speedSensorName = configs.get("speed.sensor.name").toString();

    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

          List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
          int numPartitions = partitions.size();
          int sp = (int)Math.abs(numPartitions*0.3);
          int p=0;

           if ( (keyBytes == null) || (!(key instanceof String)) )
                throw new InvalidRecordException("All messages must have sensor name as key");

           if ( ((String)key).equals(speedSensorName) )
                p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
           else
                p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ;

                System.out.println("Key = " + (String)key + " Partition = " + p );
                return p;
 }
     public void close() {}
}



############################################################################


package KafkaCode;

import java.util.*;
import org.apache.kafka.clients.producer.*;

public class SensorProducer {
public static void main(String[] args) throws Exception{

      String topicName = "SensorTopic";

      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092,localhost:9093");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("partitioner.class", "KafkaCode.SensorPartitioner");
      props.put("speed.sensor.name", "TSS");

      Producer<String, String> producer = new KafkaProducer <>(props);

         for (int i=0 ; i<10 ; i++)
         producer.send(new ProducerRecord<>(topicName,"SSP"+i,"500"+i));

         for (int i=0 ; i<10 ; i++)
         producer.send(new ProducerRecord<>(topicName,"TSS","500"+i));

      producer.close();

          System.out.println("SimpleProducer Completed.");
   }
}










8. KAFKA : Sample Program6 : With Custom Serializer

8. KAFKA : Sample Program6 : With Custom Serializer

//Custom Serializer -
#####################################-
Supplier.java
SupplierSerializer.java
SimpleProducer.java

SupplierDeserializer.java
SupplierConsumer.java

//create topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic SupplierTopic --partitions 5 --replication-factor 3


-> Eclipse -> Run SupplierConsumer.java
-> Eclipse -> Run SimpleProducer.java

---------------------------------------------------------
Supplier id= 101 Supplier  Name = Xyz Pvt Ltd. Supplier Start Date = Fri Apr 01 00:00:00 EDT 2016
Supplier id= 102 Supplier  Name = Abc Pvt Ltd. Supplier Start Date = Sun Jan 01 00:00:00 EST 2012
---------------------------------------------------------







7. KAFKA : Sample Program5 : With Future object return

7. KAFKA : Sample Program5 : With Future object return


//Future object -SynchronousProducer.java (get())
#####################################-
 -> get() -> returns Future object wchich contains metadata of msg which partition it went in...
metadata.partition()
metadata.offset()

-----------------------------------------------
Key is : Key1
Message is sent to Partition no 4 and offset 1
SynchronousProducer Completed with success.
----------------------------------------------

Run program any number of times it will go to same partition (i.e Partition no 4), because Key = Key1 ("Key1".hashCode() will be always same)

Change the key -> so that it will got to different partition, bez which partition decides based on key

//changed key to -> String key = "Key12345";

-----------------------------------------------
Key is : Key12345
Message is sent to Partition no 0 and offset 0
SynchronousProducer Completed with success.
----------------------------------------------




6. KAFKA : Sample Program4 : 1 Node - Multiple brokers

6. KAFKA : Sample Program4 : 1 Node - Multiple brokers




Kafka Cluster
-------------------

1 Node - 1 Broker

1 Node - Multiple brokers

Multiple Node - Multiple Brokerss


We will try - 1 Node - Multiple brokers
#####################################-
conf/server.properties
Broker-id   (broker.id=0)
log-dir     log.dirs=/tmp/kafka-logs
ip:port     (listeners=PLAINTEXT://:9092)

conf/server1.properties
Broker-id  (broker.id=1)
log-dir     log.dirs=/tmp/kafka-logs2
ip:port    (listeners=PLAINTEXT://:9093)

conf/server2.properties
Broker-id  (broker.id=2)
log-dir     log.dirs=/tmp/kafka-logs3
ip:port    (listeners=PLAINTEXT://:9094)



------------------------------------------------------------
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties

//create topic with replication factor - 3 (bez there are 3 brokers running it can replicate in all 3)
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic NewTopic --partitions 5 --replication-factor 3

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic NewTopic

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewTopic


-----------------------------------------------------------




5. KAFKA : Sample Program3 : Custom Java Producer

5. KAFKA : Sample Program3 : Custom Java Producer


Custom Producer
#####################################-
In VMWare Workstation -> Kafka VM
Start Zookeeper
Start Broker
Create topic "MyFirstTopic"
Start consumer with that topic "MyFirstTopic"

Open Eclipse  (create Custom Producer and Run)
-----------------------------------------------
-> give workspace
-> Create normal Java Project "KafkaProject"
-> Create java class -> SimpleProducer.java
-> take some java files from -> workspaceKafka-20200110T040943Z-001\workspaceKafka\MyKAfkaProject\src\main\java\KafkaCode
-> copy SimpleProducer.java to your eclipse SimpleProducer.java
-> Make sure in code : topic name and Port
-> Run java Application
-> Verify in consumer terminal whether value is appearing or not
//we should see "Value-2" in consumer terminal





########################################################################

package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
public static void main(String[] args) throws Exception{
        
      String topicName = "MyFirstTopic";
  String key = "Key1";
  String value = "Value-2";
      
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092,localhost:9093");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  Producer<String, String> producer = new KafkaProducer <>(props);
  ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
  producer.send(record);        
      producer.close();
  
  System.out.println("SimpleProducer Completed.");
   }
}

########################################################################

Producer Flow (SimpleProcedure.java)
#####################

Serializer - based on key_value 
we can mention which Serializer to do serialization (we use kafka serializer)
- convert to Byte Arrray


Producer(props)
ProducerRecord(topicName, key, value)
send(ProdRec)

//even we can pass Partition# in constructor
//parttion# where it has to go will be decided based on key  (key.hashCode)
key.hashcode * no of partition

Thursday, January 9, 2020

4. KAFKA : Sample Program2

4. KAFKA : Sample Program2



Step1 : Download Vmware workstation


                ********@gmail.com  
                PAVAN KUMAR MR
                ********    
 
 

Step2: Download Kafka_VM.ovf


Download Kafka_VM.ova from - https://drive.google.com/file/d/1UVqRyH10vGEh6rOi-0ivpsIS2KLIiYVF/view?ts=5e16bae0



Step3: Open kafka VM in VMWare workstation








Open Terminal in Kaka_VM and execute below commands highlighted in blue..

export http_proxy=www-**************.com:80
export https_proxy=www-*************.com:80
wget www.google.com
//download kafka directly in Workstation
wget http://www-us.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz



//inside kafka_2.13-2.4.0\config
zookeeper.properties    //zookeeper can be downloaded independent also
dataDir=/tmp/zookeeper   //(default store in /tmp folder) - Physically maintain all Znode informations...
clientPort=2181  //on which port zookeeper is running...


server.properties //every broker one server.properties will be there
broker.id=0 //mandatory: unique broker should be there, minimum property
listeners=PLAINTEXT://:9092 //9092 is the port produceer and consumer can communicate
log.dirs=/tmp/kafka-logs
log.retention.hours=168
log.segment.bytes=1073741824   //if segment reaches this size then go that segment to one file  | | | | |

################################################################################

Open Terminal (Start Zookeeper -> Start Kafka -> Create Topic "MyFirstTopic"-> Start Producer on created Topic from  -> Start Consumer on created Topic)
//Start zookeeper & kafka server
cd kafka_2.12-2.2.1
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --zookeeper localhost:2181 --list 

//Below command will fail bez replication factor is > number of brokers
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic --partitions 3 --replication-factor 2   

//Create Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic --partitions 3 --replication-factor 1




//Describe Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic MyFirstTopic




//increasing the partition for particular topic
//we cannot decrease the partition
//even when producer/consumer running we can increase the partitions
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyFirstTopic --partitions 5

For testing - console based Producer/Consumer
---------------------------------
Terminal-1 (Producer)
--------------------
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyFirstTopic

Terminal-2 (Consumer)
---------------------
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyFirstTopic

################################################################################