diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 0c6a823..dc0f41f 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -1,5 +1,6 @@ package UnitTest; +import com.alibaba.fastjson.JSONObject; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; @@ -177,6 +178,10 @@ public class OpenGaussReplicationToKafka { String res = new String(source, offset, length); if (res.equals("BEGIN")) continue; if (res.startsWith("COMMIT")) continue; + JSONObject jo = JSONObject.parseObject(res); + jo.put("LSN", key); + res = jo.toString(); + ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> " + res);