Wednesday, September 30, 2020

Monday, July 27, 2020

8 KAFKA TRY: Realtime Kafka [ Producer Python (Read Twitter data) -> Broker -> Consumer Console ]

8 KAFKA TRY: Realtime Kafka [ Producer Python (Read Twitter data) -> Broker -> Consumer Console ]


Refer video - https://www.youtube.com/watch?v=jE9T6g9sSPI



















































































































































Sunday, July 19, 2020

7 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Java ]

7 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Java ]





Producer


#####################################################################


package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
 public static void main(String[] args) throws Exception{
        
       String topicName = "myfirstTopic1";
    String key = "Key1";
    String value = "Value - 1";
       
       Properties props = new Properties();
       props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092");
       props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer <>(props);
  
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
    producer.send(record);        
       producer.close();
    
    System.out.println("SimpleProducer Completed.");
    }
}

#########################################################################

Consumer


##########################################################################
package KafkaCode;

import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception{

        String topicName = "myfirstTopic1";
        String groupName = "myfirstTopic1Group";

        Properties props = new Properties();
        props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093");
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        @SuppressWarnings("resource")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

        while (true){
                @SuppressWarnings("deprecation")
ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                        System.out.println("Value = " + String.valueOf(record.value()));
                }
        }

}
}


##########################################################################
















6. KAFKA TRY : Producer Java : With Future object return

6. KAFKA TRY :  Producer Java : With Future object return


//Future object -SynchronousProducer.java (get())
#####################################-
 -> get() -> returns Future object which contains metadata of msg which partition it went in...
metadata.partition()
metadata.offset()

O/p -----------------------------------------------
Key is : Key1 Key Hashcode : 2335186 Message is sent to Partition no 4 and offset 1 SynchronousProducer Completed with success.
----------------------------------------------

Run program any number of times it will go to same partition (i.e Partition no 4), because Key = Key1 ("Key1".hashCode() will be always same)

Change the key -> so that it will got to different partition, bez which partition decides based on key

//changed key to -> String key = "Key12345";

O/p-----------------------------------------------
Key is : Key12345
Message is sent to Partition no 0 and offset 0
SynchronousProducer Completed with success.
----------------------------------------------




--------------------------------------------------------------------------------------

package KafkaCode;

import java.util.*;
import org.apache.kafka.clients.producer.*;

public class SynchronousProducer {
public static void main(String[] args) throws Exception{

      String topicName = "NewMultiBrokerTopic";
          String key = "Key1";
          String value = "Value - MultiBroker - Test1";

      Properties props = new Properties();
      props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093,slc15atx.us.oracle.com:9094");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer <>(props);

          ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);

      try{
           RecordMetadata metadata = producer.send(record).get();
           System.out.println("Key is : "+key);
           System.out.println("Key Hashcode : "+key.hashCode());
           System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
           System.out.println("SynchronousProducer Completed with success.");
      }catch (Exception e) {
           e.printStackTrace();
           System.out.println("SynchronousProducer failed with an exception");
      }finally{
           producer.close();
      }
   }
}



--------------------------------------------------------------------------------------

5 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Console ]

5 KAFKA TRY: Practice Kafka [ Producer Java -> Multi Broker -> Consumer Console ]


We will try - 1 Node - Multiple brokers
#####################################-
conf/server.properties
Broker-id   (broker.id=0)
log-dir     log.dirs=/tmp/kafka-logs
ip:port     (listeners=PLAINTEXT://:9092)

conf/server1.properties
Broker-id  (broker.id=1)
log-dir     log.dirs=/tmp/kafka-logs2
ip:port    (listeners=PLAINTEXT://:9093)

conf/server2.properties
Broker-id  (broker.id=2)
log-dir     log.dirs=/tmp/kafka-logs3
ip:port    (listeners=PLAINTEXT://:9094)


Kafka : Running in Linux m/c (slc15atx.us.oracle.com)

Run Zookeper - bin/zookeeper-server-start.sh config/zookeeper.properties

Run Broker -
        bin/kafka-server-start.sh config/server.properties
        bin/kafka-server-start.sh config/server1.properties
        bin/kafka-server-start.sh config/server2.properties

Create topic bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic NewMultiBrokerTopic --partitions 5 --replication-factor 3

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic NewMultiBrokerTopic






Run consumer - 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic NewMultiBrokerTopic

Producer : Write java in windows


------------------------------------------------------------------------------------------------------------


package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
 public static void main(String[] args) throws Exception{
     
       String topicName = "NewMultiBrokerTopic";
    String key = "Key1";
    String value = "Value - MultiBroker - Test";
     
       Properties props = new Properties();
       props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093,slc15atx.us.oracle.com:9094");
       props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");       
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer <>(props);

    ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
    producer.send(record);     
       producer.close();
 
    System.out.println("SimpleProducer Completed.");
    }
}



------------------------------------------------------------------------------------------------------------