diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index 3e50759..ba999d6 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -6,7 +6,8 @@ import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; public class KafkaProductorTest { - public static void main(String[] args) throws Exception { + + protected static Properties init() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); /* @@ -42,20 +43,22 @@ public class KafkaProductorTest { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return props; + } - Producer producer = new KafkaProducer<>(props); + public static void main(String[] args) throws Exception { + Producer producer = new KafkaProducer<>(init()); - int msgNum = 5; - for (int i = 1; i <= msgNum; i++) { + for (int i = 1; i <= 10; i++) { //指定发送分区 - String TOPIC_NAME="test"; + String TOPIC_NAME = "test"; ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME - , 0, "bbb", "aaa"); + , 0, "HuangHai_" + i, "黄海_" + i); //等待消息发送成功的同步阻塞方法 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" - + metadata.partition() + "|offset-" + metadata.offset()+"|order-"+i); + + metadata.partition() + "|offset-" + metadata.offset() + "|" + i); } producer.close(); }