diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index efaa484..29eb23e 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -56,10 +56,7 @@ public class kafkaConsumerTest { //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); - /* - 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱, - 会将其踢出消费组,将分区分配给别的consumer消费 - */ + //如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -150,7 +147,6 @@ public class kafkaConsumerTest { if (records.count() > 0) { System.out.println("将插入数据条数:" + writeList.size()); - // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed for " + offsets);