5. KAFKA : Sample Program3 : Custom Java Producer
Custom Producer
#####################################-
In VMWare Workstation -> Kafka VM
Start Zookeeper
Start Broker
Create topic "MyFirstTopic"
Start consumer with that topic "MyFirstTopic"
Open Eclipse (create Custom Producer and Run)
-----------------------------------------------
-> give workspace
-> Create normal Java Project "KafkaProject"
-> Create java class -> SimpleProducer.java
-> take some java files from -> workspaceKafka-20200110T040943Z-001\workspaceKafka\MyKAfkaProject\src\main\java\KafkaCode
-> copy SimpleProducer.java to your eclipse SimpleProducer.java
-> Make sure in code : topic name and Port
-> Run java Application
-> Verify in consumer terminal whether value is appearing or not
//we should see "Value-2" in consumer terminal
Custom Producer
#####################################-
In VMWare Workstation -> Kafka VM
Start Zookeeper
Start Broker
Create topic "MyFirstTopic"
Start consumer with that topic "MyFirstTopic"
Open Eclipse (create Custom Producer and Run)
-----------------------------------------------
-> give workspace
-> Create normal Java Project "KafkaProject"
-> Create java class -> SimpleProducer.java
-> take some java files from -> workspaceKafka-20200110T040943Z-001\workspaceKafka\MyKAfkaProject\src\main\java\KafkaCode
-> copy SimpleProducer.java to your eclipse SimpleProducer.java
-> Make sure in code : topic name and Port
-> Run java Application
-> Verify in consumer terminal whether value is appearing or not
//we should see "Value-2" in consumer terminal
########################################################################
package KafkaCode;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducer {
public static void main(String[] args) throws Exception{
String topicName = "MyFirstTopic";
String key = "Key1";
String value = "Value-2";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
########################################################################
Producer Flow (SimpleProcedure.java)
#####################
Serializer - based on key_value
we can mention which Serializer to do serialization (we use kafka serializer)
- convert to Byte Arrray
Producer(props)
ProducerRecord(topicName, key, value)
send(ProdRec)
//even we can pass Partition# in constructor
//parttion# where it has to go will be decided based on key (key.hashCode)
key.hashcode * no of partition
Producer Flow (SimpleProcedure.java)
#####################
Serializer - based on key_value
we can mention which Serializer to do serialization (we use kafka serializer)
- convert to Byte Arrray
Producer(props)
ProducerRecord(topicName, key, value)
send(ProdRec)
//even we can pass Partition# in constructor
//parttion# where it has to go will be decided based on key (key.hashCode)
key.hashcode * no of partition
No comments:
Post a Comment