diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index ba999d6..9657f54 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -7,7 +7,7 @@ import org.apache.kafka.common.serialization.StringSerializer; public class KafkaProductorTest { - protected static Properties init() { + protected static Properties initConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); /* @@ -47,7 +47,7 @@ public class KafkaProductorTest { } public static void main(String[] args) throws Exception { - Producer producer = new KafkaProducer<>(init()); + Producer producer = new KafkaProducer<>(initConfig()); for (int i = 1; i <= 10; i++) { //指定发送分区 diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index cd23d37..9368f3b 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -1,6 +1,7 @@ package UnitTest; import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; @@ -27,29 +28,19 @@ public class kafkaConsumerTest { } protected static Properties initConfig() { - Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group"); - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1"); - return properties; - } - - public static void main(String[] args) throws ParseException { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); - String CONSUMER_GROUP_NAME="dsideal_group"; - String TOPIC_NAME="test"; - - // 消费分组名 - props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 是否自动提交offset,默认就是true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + // 消费分组名 + props.put(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group"); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1"); // 自动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); -// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); /* 当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费 latest(默认) :只消费自己启动之后发送到主题的消息 @@ -67,7 +58,6 @@ public class kafkaConsumerTest { */ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); - //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); /* @@ -75,14 +65,22 @@ public class kafkaConsumerTest { 会将其踢出消费组,将分区分配给别的consumer消费 */ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer(props); + return props; + } + + public static void main(String[] args) throws ParseException { + String TOPIC_NAME = "test"; + KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); + + //如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。 + //如使用手动分区分配,那么注释掉自动分区分配。 + //https://blog.csdn.net/qq_41413743/article/details/123636586 + // consumer.subscribe(Arrays.asList(TOPIC_NAME)); - consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 消费指定分区 - //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //消息回溯消费 /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); @@ -93,8 +91,9 @@ public class kafkaConsumerTest { consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/ //从指定时间点开始消费 + List topicPartitions = consumer.partitionsFor(TOPIC_NAME); - /*List topicPartitions = consumer.partitionsFor(TOPIC_NAME); + /* //从1小时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map map = new HashMap<>(); @@ -114,8 +113,8 @@ public class kafkaConsumerTest { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } - }*/ - + } + */ while (true) { /* @@ -147,4 +146,3 @@ public class kafkaConsumerTest { } } } -//https://blog.csdn.net/qq_39839075/article/details/105522855