main
黄海 2 years ago
parent 3b761b78a6
commit c97ed6c4cb

@ -6,7 +6,8 @@ import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProductorTest {
public static void main(String[] args) throws Exception {
protected static Properties init() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
/*
@ -42,20 +43,22 @@ public class KafkaProductorTest {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return props;
}
Producer<String, String> producer = new KafkaProducer<>(props);
public static void main(String[] args) throws Exception {
Producer<String, String> producer = new KafkaProducer<>(init());
int msgNum = 5;
for (int i = 1; i <= msgNum; i++) {
for (int i = 1; i <= 10; i++) {
//指定发送分区
String TOPIC_NAME="test";
String TOPIC_NAME = "test";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME
, 0, "bbb", "aaa");
, 0, "HuangHai_" + i, "黄海_" + i);
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset()+"|order-"+i);
+ metadata.partition() + "|offset-" + metadata.offset() + "|" + i);
}
producer.close();
}

Loading…
Cancel
Save