main
黄海 2 years ago
parent b2b639f7e3
commit b479a69c4c

@ -1,5 +1,6 @@
package UnitTest;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
@ -177,6 +178,10 @@ public class OpenGaussReplicationToKafka {
String res = new String(source, offset, length);
if (res.equals("BEGIN")) continue;
if (res.startsWith("COMMIT")) continue;
JSONObject jo = JSONObject.parseObject(res);
jo.put("LSN", key);
res = jo.toString();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> " + res);

Loading…
Cancel
Save