From b479a69c4c520dab9c18c5b2dac8fea44ea74f2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Tue, 30 May 2023 15:00:40 +0800 Subject: [PATCH] 'commit' --- src/main/java/UnitTest/OpenGaussReplicationToKafka.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 0c6a823..dc0f41f 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -1,5 +1,6 @@ package UnitTest; +import com.alibaba.fastjson.JSONObject; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; @@ -177,6 +178,10 @@ public class OpenGaussReplicationToKafka { String res = new String(source, offset, length); if (res.equals("BEGIN")) continue; if (res.startsWith("COMMIT")) continue; + JSONObject jo = JSONObject.parseObject(res); + jo.put("LSN", key); + res = jo.toString(); + ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> " + res);