diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 6be7aab..e16a3e5 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -1,5 +1,6 @@ package UnitTest; +import com.alibaba.fastjson.JSONObject; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; @@ -316,6 +317,10 @@ public class OpenGaussReplicationToKafka { if (res.startsWith("COMMIT")) continue; + JSONObject jo = JSONObject.parseObject(res); + jo.put("LSN", key); + res = jo.toString(); + ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record);