Sunday, July 19, 2020

6. KAFKA TRY : Producer Java : With Future object return

6. KAFKA TRY :  Producer Java : With Future object return


//Future object -SynchronousProducer.java (get())
#####################################-
 -> get() -> returns Future object which contains metadata of msg which partition it went in...
metadata.partition()
metadata.offset()

O/p -----------------------------------------------
Key is : Key1 Key Hashcode : 2335186 Message is sent to Partition no 4 and offset 1 SynchronousProducer Completed with success.
----------------------------------------------

Run program any number of times it will go to same partition (i.e Partition no 4), because Key = Key1 ("Key1".hashCode() will be always same)

Change the key -> so that it will got to different partition, bez which partition decides based on key

//changed key to -> String key = "Key12345";

O/p-----------------------------------------------
Key is : Key12345
Message is sent to Partition no 0 and offset 0
SynchronousProducer Completed with success.
----------------------------------------------




--------------------------------------------------------------------------------------

package KafkaCode;

import java.util.*;
import org.apache.kafka.clients.producer.*;

public class SynchronousProducer {
public static void main(String[] args) throws Exception{

      String topicName = "NewMultiBrokerTopic";
          String key = "Key1";
          String value = "Value - MultiBroker - Test1";

      Properties props = new Properties();
      props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093,slc15atx.us.oracle.com:9094");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer <>(props);

          ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);

      try{
           RecordMetadata metadata = producer.send(record).get();
           System.out.println("Key is : "+key);
           System.out.println("Key Hashcode : "+key.hashCode());
           System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
           System.out.println("SynchronousProducer Completed with success.");
      }catch (Exception e) {
           e.printStackTrace();
           System.out.println("SynchronousProducer failed with an exception");
      }finally{
           producer.close();
      }
   }
}



--------------------------------------------------------------------------------------

No comments:

Post a Comment