|
|
|
@ -56,10 +56,7 @@ public class kafkaConsumerTest {
|
|
|
|
|
|
|
|
|
|
//一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
|
|
|
|
|
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
|
|
|
|
|
/*
|
|
|
|
|
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
|
|
|
|
|
会将其踢出消费组,将分区分配给别的consumer消费
|
|
|
|
|
*/
|
|
|
|
|
//如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费
|
|
|
|
|
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
|
|
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
|
|
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
|
|
|
@ -150,7 +147,6 @@ public class kafkaConsumerTest {
|
|
|
|
|
|
|
|
|
|
if (records.count() > 0) {
|
|
|
|
|
System.out.println("将插入数据条数:" + writeList.size());
|
|
|
|
|
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
|
|
|
|
|
consumer.commitAsync((offsets, exception) -> {
|
|
|
|
|
if (exception != null) {
|
|
|
|
|
System.err.println("Commit failed for " + offsets);
|
|
|
|
|