diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index eabcfcc..cfcf86a 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -1,5 +1,6 @@ package UnitTest; +import com.alibaba.fastjson.JSONObject; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; @@ -41,7 +42,7 @@ public class OpenGaussReplicationToKafka { // 槽名 - public static final String slotName = "slot_huanghai"; + public static final String slotName = "slot_huanghai2"; public static void Init() { //读取库 @@ -104,16 +105,16 @@ public class OpenGaussReplicationToKafka { Init(); // 删除槽 - // DeleteSlot(slotName); + DeleteSlot(slotName); // 创建槽 - //CreateSlot(slotName); + CreateSlot(slotName); // 查看槽 //ListSlot(); //TruncateTable(); //插入测试数据 - //InsertTestData(); + InsertTestData(); //获取最后的读取偏移位置 String lsn = GetRestartLsn(slotName); @@ -171,6 +172,9 @@ public class OpenGaussReplicationToKafka { String res = new String(source, offset, length); if (res.equals("BEGIN")) continue; if (res.startsWith("COMMIT")) continue; + JSONObject jo = JSONObject.parseObject(res); + jo.put("LSN", key); + res = jo.toString(); ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> " + res);