From 993e685d374ba6b3de122996e62b128a08bbb622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 16:10:20 +0800 Subject: [PATCH] 'commit' --- src/main/java/UnitTest/kafkaConsumerTest.java | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index f06fba2..efaa484 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -1,25 +1,25 @@ package UnitTest; import com.alibaba.fastjson.JSONObject; -import com.aspose.slides.Collections.ArrayList; -import com.dsideal.FengHuang.Util.CommonUtil; -import com.jfinal.kit.PathKit; import com.jfinal.kit.PropKit; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.druid.DruidPlugin; -import com.jfinal.plugin.redis.RedisPlugin; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; -import java.io.File; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; public class kafkaConsumerTest { @@ -131,7 +131,7 @@ public class kafkaConsumerTest { while (true) { List writeList = new ArrayList(); - List delList = new ArrayList(); + List delList = new ArrayList(); ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { //kafka中的offset是一个64位的有符号整数 @@ -144,33 +144,20 @@ public class kafkaConsumerTest { r.set("table", jo.get("table")); r.set("type", jo.get("type")); writeList.add(r); - delList.add(record.offset()); + String sql = "delete from t_sync_log where offset = " + record.offset(); + delList.add(sql); } if (records.count() > 0) { - // 手动同步提交offset,当前线程会阻塞直到offset提交成功 - // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 - //consumer.commitSync(); - - System.out.println("将插入数据条数:"+writeList.size()); + System.out.println("将插入数据条数:" + writeList.size()); // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } else { - if (writeList.size() > 0) { - String str = ""; - for (Long aLong : delList) { - str = str + "," + aLong; - } - if (str.length() > 0) { - str = str.substring(1); - String sql = "delete from t_sync_log where offset in (" + str + ")"; - Db.update(sql); - } - Db.batchSave("t_sync_log", writeList, 100); - } + if (delList.size() > 0) Db.batch(delList, 100); + if (writeList.size() > 0) Db.batchSave("t_sync_log", writeList, 100); } }); }