Monday, July 27, 2020

8 KAFKA TRY: Realtime Kafka [ Producer Python (Read Twitter data) -> Broker -> Consumer Console ]

8 KAFKA TRY: Realtime Kafka [ Producer Python (Read Twitter data) -> Broker -> Consumer Console ]


Refer video - https://www.youtube.com/watch?v=jE9T6g9sSPI



















































































































































Sunday, July 19, 2020

7 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Java ]

7 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Java ]





Producer


#####################################################################


package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
 public static void main(String[] args) throws Exception{
        
       String topicName = "myfirstTopic1";
    String key = "Key1";
    String value = "Value - 1";
       
       Properties props = new Properties();
       props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092");
       props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer <>(props);
  
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
    producer.send(record);        
       producer.close();
    
    System.out.println("SimpleProducer Completed.");
    }
}

#########################################################################

Consumer


##########################################################################
package KafkaCode;

import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception{

        String topicName = "myfirstTopic1";
        String groupName = "myfirstTopic1Group";

        Properties props = new Properties();
        props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093");
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        @SuppressWarnings("resource")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

        while (true){
                @SuppressWarnings("deprecation")
ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                        System.out.println("Value = " + String.valueOf(record.value()));
                }
        }

}
}


##########################################################################
















6. KAFKA TRY : Producer Java : With Future object return

6. KAFKA TRY :  Producer Java : With Future object return


//Future object -SynchronousProducer.java (get())
#####################################-
 -> get() -> returns Future object which contains metadata of msg which partition it went in...
metadata.partition()
metadata.offset()

O/p -----------------------------------------------
Key is : Key1 Key Hashcode : 2335186 Message is sent to Partition no 4 and offset 1 SynchronousProducer Completed with success.
----------------------------------------------

Run program any number of times it will go to same partition (i.e Partition no 4), because Key = Key1 ("Key1".hashCode() will be always same)

Change the key -> so that it will got to different partition, bez which partition decides based on key

//changed key to -> String key = "Key12345";

O/p-----------------------------------------------
Key is : Key12345
Message is sent to Partition no 0 and offset 0
SynchronousProducer Completed with success.
----------------------------------------------




--------------------------------------------------------------------------------------

package KafkaCode;

import java.util.*;
import org.apache.kafka.clients.producer.*;

public class SynchronousProducer {
public static void main(String[] args) throws Exception{

      String topicName = "NewMultiBrokerTopic";
          String key = "Key1";
          String value = "Value - MultiBroker - Test1";

      Properties props = new Properties();
      props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093,slc15atx.us.oracle.com:9094");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer <>(props);

          ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);

      try{
           RecordMetadata metadata = producer.send(record).get();
           System.out.println("Key is : "+key);
           System.out.println("Key Hashcode : "+key.hashCode());
           System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
           System.out.println("SynchronousProducer Completed with success.");
      }catch (Exception e) {
           e.printStackTrace();
           System.out.println("SynchronousProducer failed with an exception");
      }finally{
           producer.close();
      }
   }
}



--------------------------------------------------------------------------------------

5 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Console ]

5 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Console ]


We will try - 1 Node - Multiple brokers
#####################################-
conf/server.properties
Broker-id   (broker.id=0)
log-dir     log.dirs=/tmp/kafka-logs
ip:port     (listeners=PLAINTEXT://:9092)

conf/server1.properties
Broker-id  (broker.id=1)
log-dir     log.dirs=/tmp/kafka-logs2
ip:port    (listeners=PLAINTEXT://:9093)

conf/server2.properties
Broker-id  (broker.id=2)
log-dir     log.dirs=/tmp/kafka-logs3
ip:port    (listeners=PLAINTEXT://:9094)


Kafka : Running in Linux m/c (slc15atx.us.oracle.com)

Run Zookeper - bin/zookeeper-server-start.sh config/zookeeper.properties

Run Broker -
        bin/kafka-server-start.sh config/server.properties
        bin/kafka-server-start.sh config/server1.properties
        bin/kafka-server-start.sh config/server2.properties

Create topic bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic NewMultiBrokerTopic --partitions 5 --replication-factor 3

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic NewMultiBrokerTopic






Run consumer - 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewMultiBrokerTopic

Producer : Write java in windows


------------------------------------------------------------------------------------------------------------


package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
 public static void main(String[] args) throws Exception{
     
       String topicName = "NewMultiBrokerTopic";
    String key = "Key1";
    String value = "Value - MultiBroker - Test";
     
       Properties props = new Properties();
       props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093,slc15atx.us.oracle.com:9094");
       props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");       
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer <>(props);

    ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
    producer.send(record);     
       producer.close();
 
    System.out.println("SimpleProducer Completed.");
    }
}



------------------------------------------------------------------------------------------------------------







4 KAFKA TRY: Practice Kafka [ Producer Java -> Single Broker -> Consumer Console ]

4 KAFKA TRY: Practice Kafka [ Producer Java -> Single Broker -> Consumer Console ]


Kafka : Running in Linux m/c (slc15atx.us.oracle.com)

Run Zookeper - bin/zookeeper-server-start.sh config/zookeeper.properties
Run Broker - bin/kafka-server-start.sh config/server.properties
Create topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myfirstTopic1
Run consumer - bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myfirstTopic1

Producer : Write java in windows eclipse


Create java project - "Kafka_Try_2020"
Add all kafka libraries from -> Kafka\kafka_2.13-2.4.0\libs
Write below java program - "SimpleProducer"
Make sure in code : topic name and Broker Ports are correct

---------------------------------------------------------------------------------------------------

package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
 public static void main(String[] args) throws Exception{
        
       String topicName = "myfirstTopic1";
    String key = "Key1";
    String value = "Value-2";
       
       Properties props = new Properties();
       props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093");
       props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer <>(props);
  
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
    producer.send(record);        
       producer.close();
    
    System.out.println("SimpleProducer Completed.");
    }
}


---------------------------------------------------------------------------------------------------































3 KAFKA TRY: Practice Kafka on LINUX [ Producer Console -> Single Broker -> Consumer Console ]

3 KAFKA TRY: Practice Kafka on LINUX [ Producer Console -> Single Broker -> Consumer Console ]





Step1: Download Kafka






tar -xzf kafka_2.11-0.10.1.0.tgz







Step2: Start Zookeeper (Terminal1)

è                           Zookeeper coordinates between brokers
è                          We have to start zookeeper before broker/kafka server starts.

cd kafka_2.11-0.10.1.0








//For Linux bin/zookeeper-server-start.sh
//For Windows bin\windows\zookeeper-server-start

bin/zookeeper-server-start.sh config/zookeeper.properties


    In Windows : while starting zookeeper if we get error like : "Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'"
    Then fix is - https://medium.com/@praveenkumarsingh/confluent-kafka-on-windows-how-to-fix-classpath-is-empty-cf7c31d9c787


Step3: Start Broker/Kafka server (Terminal2)

For Windows:
bin\windows\kafka-server-start.bat config\server.properties

For Linux:
bin/kafka-server-start.sh config/server.properties




Step4: Create topic (Terminal3)

For windows: 

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myfirstTopic1

For Linux:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myfirstTopic1
 






//We can check list of topics created

bin/kafka-topics.sh --list --zookeeper localhost:2181







//Start Producer to send message to myfirstTopic1 (Terminal 3)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myfirstTopic1
 




//Start Consumer to read message to myfirstTopic1 (Terminal 4)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myfirstTopic1
 























Saturday, July 18, 2020

14. KAFKA : Kafka Theory More...

14. KAFKA : Kafka Theory More...







































https://drive.google.com/file/d/1iode3dT2ziL5d0pxoA_294plrUDh6twv/view?usp=sharing















Kafka (Distributed messaging system)

#####################################

- Independent tool nothing to do with HDFS
- runs on its own cluster

Other Messaging systems
------------------------------------
Msgs sending --> kafka store on multiple m/c
JMS -> not distributed
Flume - once msg comes always it write to HDFS

Producer    consumer
----------------------------
        Database
        Application
       
Receiever
-----------
Requirement : Campturing the streamm:

    Expectation : Reciver must run all the time
              Data processing

Normal Spark Receiver/Flume:
---------------------
Normal Spark Receiver runs on commodity hardware, if it goes down one more comes but by that time no one is there to capture the coming data
Normal spark receiver - if data coming rate is high it will burst (same thing happened with indian railway)
Runs on one m/c(JVM), but we can run 2 Reciver, but both receive the data, there will be duplicate copies and uses more resource
If Receiver configure for every 5 sec, process should happen with in that so that it can take another, but if process itself takes 10 sec then ...

Kafka/Rabit-MQueue:
---------------------
Kafka -
Its not Master/slave
Falult tolarance : Replication of messages
what ever msg we send backend it will be stored in binary as file system
Able to handle millioins of message per sec easily (linked in get millions per sec)
Stream processing is available (Kafka confluent) : Later kafka started giving Kafka processing engine also (same like spark)
Retension: by default it preserve the streams for 7 days, and preserve in array of bytes, message will not be removed irrespective of consumer consumed or not, once retension comes then only it will removed
    - retension time is over
    - retension size is over
Messages (csv,txt,avro...)

Header
----------
Body

1 Tweet (1msg should convert to Byte Array)  ->   Kafka(Recieve always as Byte array)   ->   Stream   -----> Poll (Time : 100ms, size: 100kb)
                                                        If message size is > 100kb => will throw error, so that we need to increase message size
                                                        If message size is < 100 kb => other message also come with that

Every Instance of kafka is called => Broker
Every broker should have unique id (eg: B0, B1, B2)
Every broker should have unique IP:Port (eg: IP:9092, IP:9093, IP:9094)
As of now there is NO limitation for number of brokers

Producer1 send message(Topic-"sales") -> will go to Queue(Partition) internally store as file
Producer2 send message(Topic-"sales") ->

Zookeeper => keeps the count of Broker, Broker sends ping to zookeeper server (Heart beat time : Tick tok)
In Kafka => Zookeeper is mandatory, and it comes 1st before broker starts (means we have to start the zookeeper 1st then kafka/broker server)                   
heart beat could be in mili sec also
Znode - How zookeeper maintain all broker is : every object maintain in zookeeper called Znode
 eg : If zookeeper is mainintainig 3 borkers, then 2 Znodes
    Znode - Broker0(Id), Lastupdate,
    Znode - Broker1(Id), Lastupdate,
    Znode - Broker2(Id), Lastupdate, 

Kafka => Min Java 1.6

Images###########
export http_proxy=www-proxy.us.oracle.com:80
export https_proxy=www-proxy.us.oracle.com:80
wget www.google.com
wget http://www-us.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz   //download kafka directly in Workstation

kafka_2.13-2.4.0\config
zookeeper.properties    //zookeeper can be downloaded independent also
    dataDir=/tmp/zookeeper   //(default store in /tmp folder) - Physically maintain all Znode informations...
    clientPort=2181         //on which port zookeeper is running...


server.properties    //every broker one server.properties will be there
    broker.id=0    //mandatory: unique broker should be there, minimum property
    listeners=PLAINTEXT://:9092    //9092 is the port produceer and consumer can communicate
    log.dirs=/tmp/kafka-logs
    log.retention.hours=168
    log.segment.bytes=1073741824   //if segment reaches this size then go that segment to one file  | | | | |

#################################################
//Start zookeeper & kafka server
cd kafka_2.12-2.2.1
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --zookeeper localhost:2181 --list

//Below command will fail bez replication factor is > number of brokers
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic --partitions 3 --replication-factor 2  

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic --partitions 3 --replication-factor 1

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic MyFirstTopic

//increasing the partition for particular topic
//we cannot decrease the partition
//even when producer/consumer running we can increase the partitions
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyFirstTopic --partitions 5

For testing - console based Producer/Consumer
---------------------------------
Terminal-1 (Producer)
--------------------
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyFirstTopic

Terminal-2 (Consumer)
---------------------
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyFirstTopic

#################################################

for Every parttion there is a leader selected, only leader will get and give on demand
Eg: if 3 borkers are there
    partition0 -> 1 Leader(Brok2), 2 followers
    partition1 -> 1 Leader(Brok1), 2 followers
    partition3 -> 1 Leader(Brok3), 2 followers


Kafka => the order of message consumer receives will not be in same order, but producer receives in correct order

Kafka monitoring tool => just to monitor what is there in Kafka partition

Producer has choice want acknoledge   ->   Leaders read
    - 0 : Fast : (Fire and forget)
    - 1 : Slow: (If Leader reads fine, then Leader writes 1)
    - All : Slowest : (If Leader reads fine, write to Lea-der and followers then Leader writes All) - Banking sector

Ways to Send message from Producer

Topic : "MyMessage"
---------

Message - Hello

Message - India, Hello
      key.hashcode % no of partition
        100  % 4 = 0    -- will go to Partition P0

       Pak, hello
        40 % 4 = 1      -- will go to Partition P1

       Nepal, hello
        40 % 4 = 1      -- will go to Partition P1

Message - K, V, P#

Message - K, V, P#, TS



Consumer maintains below 2 offset, so that for next reading cosumer can ask...
############################################

1> Read offset
2> Commit offset

Status of Broker is maintained by zookeeper