Tuesday, January 14, 2020

5. KAFKA : Sample Program3 : Custom Java Producer

5. KAFKA : Sample Program3 : Custom Java Producer


Custom Producer
#####################################-
In VMWare Workstation -> Kafka VM
Start Zookeeper
Start Broker
Create topic "MyFirstTopic"
Start consumer with that topic "MyFirstTopic"

Open Eclipse  (create Custom Producer and Run)
-----------------------------------------------
-> give workspace
-> Create normal Java Project "KafkaProject"
-> Create java class -> SimpleProducer.java
-> take some java files from -> workspaceKafka-20200110T040943Z-001\workspaceKafka\MyKAfkaProject\src\main\java\KafkaCode
-> copy SimpleProducer.java to your eclipse SimpleProducer.java
-> Make sure in code : topic name and Port
-> Run java Application
-> Verify in consumer terminal whether value is appearing or not
//we should see "Value-2" in consumer terminal





########################################################################

package KafkaCode;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
public static void main(String[] args) throws Exception{
        
      String topicName = "MyFirstTopic";
  String key = "Key1";
  String value = "Value-2";
      
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092,localhost:9093");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  Producer<String, String> producer = new KafkaProducer <>(props);
  ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
  producer.send(record);        
      producer.close();
  
  System.out.println("SimpleProducer Completed.");
   }
}

########################################################################

Producer Flow (SimpleProcedure.java)
#####################

Serializer - based on key_value 
we can mention which Serializer to do serialization (we use kafka serializer)
- convert to Byte Arrray


Producer(props)
ProducerRecord(topicName, key, value)
send(ProdRec)

//even we can pass Partition# in constructor
//parttion# where it has to go will be decided based on key  (key.hashCode)
key.hashcode * no of partition

No comments:

Post a Comment