6. KAFKA TRY : Producer Java : With Future object return
//Future object -SynchronousProducer.java (get())
#####################################-
-> get() -> returns Future object which contains metadata of msg which partition it went in...
metadata.partition()
metadata.offset()
O/p -----------------------------------------------
Key is : Key1 Key Hashcode : 2335186 Message is sent to Partition no 4 and offset 1 SynchronousProducer Completed with success.
----------------------------------------------
Run program any number of times it will go to same partition (i.e Partition no 4), because Key = Key1 ("Key1".hashCode() will be always same)
Change the key -> so that it will got to different partition, bez which partition decides based on key
//changed key to -> String key = "Key12345";
O/p-----------------------------------------------
Key is : Key12345
Message is sent to Partition no 0 and offset 0
SynchronousProducer Completed with success.
----------------------------------------------
--------------------------------------------------------------------------------------
package KafkaCode;
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SynchronousProducer {
public static void main(String[] args) throws Exception{
String topicName = "NewMultiBrokerTopic";
String key = "Key1";
String value = "Value - MultiBroker - Test1";
Properties props = new Properties();
props.put("bootstrap.servers", "slc15atx.us.oracle.com:9092,slc15atx.us.oracle.com:9093,slc15atx.us.oracle.com:9094");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
try{
RecordMetadata metadata = producer.send(record).get();
System.out.println("Key is : "+key);
System.out.println("Key Hashcode : "+key.hashCode());
System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
System.out.println("SynchronousProducer Completed with success.");
}catch (Exception e) {
e.printStackTrace();
System.out.println("SynchronousProducer failed with an exception");
}finally{
producer.close();
}
}
}
--------------------------------------------------------------------------------------
No comments:
Post a Comment