main
黄海 2 years ago
parent c97ed6c4cb
commit 4656b5f94f

@ -7,7 +7,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProductorTest { public class KafkaProductorTest {
protected static Properties init() { protected static Properties initConfig() {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
/* /*
@ -47,7 +47,7 @@ public class KafkaProductorTest {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Producer<String, String> producer = new KafkaProducer<>(init()); Producer<String, String> producer = new KafkaProducer<>(initConfig());
for (int i = 1; i <= 10; i++) { for (int i = 1; i <= 10; i++) {
//指定发送分区 //指定发送分区

@ -1,6 +1,7 @@
package UnitTest; package UnitTest;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
@ -27,29 +28,19 @@ public class kafkaConsumerTest {
} }
protected static Properties initConfig() { protected static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1");
return properties;
}
public static void main(String[] args) throws ParseException {
Properties props = new Properties(); Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
String CONSUMER_GROUP_NAME="dsideal_group"; props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
String TOPIC_NAME="test"; props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自动提交offset默认就是true // 是否自动提交offset默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1");
// 自动提交offset的间隔时间 // 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/* /*
offsetoffset offsetoffset
latest() latest()
@ -67,7 +58,6 @@ public class kafkaConsumerTest {
*/ */
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll最大拉取消息的条数如果消费者处理速度很快可以设置大点如果处理速度一般可以设置小点 //一次poll最大拉取消息的条数如果消费者处理速度很快可以设置大点如果处理速度一般可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
/* /*
@ -75,14 +65,22 @@ public class kafkaConsumerTest {
consumer consumer
*/ */
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); return props;
}
public static void main(String[] args) throws ParseException {
String TOPIC_NAME = "test";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConfig());
//如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。
//如使用手动分区分配,那么注释掉自动分区分配。
//https://blog.csdn.net/qq_41413743/article/details/123636586
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费指定分区 // 消费指定分区
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消费 //消息回溯消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
@ -93,8 +91,9 @@ public class kafkaConsumerTest {
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/ consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/
//从指定时间点开始消费 //从指定时间点开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); /*
//从1小时前开始消费 //从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>(); Map<TopicPartition, Long> map = new HashMap<>();
@ -114,8 +113,8 @@ public class kafkaConsumerTest {
consumer.assign(Arrays.asList(key)); consumer.assign(Arrays.asList(key));
consumer.seek(key, offset); consumer.seek(key, offset);
} }
}*/ }
*/
while (true) { while (true) {
/* /*
@ -147,4 +146,3 @@ public class kafkaConsumerTest {
} }
} }
} }
//https://blog.csdn.net/qq_39839075/article/details/105522855

Loading…
Cancel
Save