main
黄海 2 years ago
parent e5b668fee1
commit 993e685d37

@ -1,25 +1,25 @@
package UnitTest;
import com.alibaba.fastjson.JSONObject;
import com.aspose.slides.Collections.ArrayList;
import com.dsideal.FengHuang.Util.CommonUtil;
import com.jfinal.kit.PathKit;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.druid.DruidPlugin;
import com.jfinal.plugin.redis.RedisPlugin;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class kafkaConsumerTest {
@ -131,7 +131,7 @@ public class kafkaConsumerTest {
while (true) {
List<Record> writeList = new ArrayList();
List<Long> delList = new ArrayList();
List<String> delList = new ArrayList();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//kafka中的offset是一个64位的有符号整数
@ -144,33 +144,20 @@ public class kafkaConsumerTest {
r.set("table", jo.get("table"));
r.set("type", jo.get("type"));
writeList.add(r);
delList.add(record.offset());
String sql = "delete from t_sync_log where offset = " + record.offset();
delList.add(sql);
}
if (records.count() > 0) {
// 手动同步提交offset当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
//consumer.commitSync();
System.out.println("将插入数据条数:"+writeList.size());
System.out.println("将插入数据条数:" + writeList.size());
// 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
} else {
if (writeList.size() > 0) {
String str = "";
for (Long aLong : delList) {
str = str + "," + aLong;
}
if (str.length() > 0) {
str = str.substring(1);
String sql = "delete from t_sync_log where offset in (" + str + ")";
Db.update(sql);
}
Db.batchSave("t_sync_log", writeList, 100);
}
if (delList.size() > 0) Db.batch(delList, 100);
if (writeList.size() > 0) Db.batchSave("t_sync_log", writeList, 100);
}
});
}

Loading…
Cancel
Save