main
黄海 2 years ago
parent 2b22be19af
commit ca1d6e6166

@ -1,5 +1,6 @@
package UnitTest; package UnitTest;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.activerecord.Record;
@ -41,7 +42,7 @@ public class OpenGaussReplicationToKafka {
// 槽名 // 槽名
public static final String slotName = "slot_huanghai"; public static final String slotName = "slot_huanghai2";
public static void Init() { public static void Init() {
//读取库 //读取库
@ -104,16 +105,16 @@ public class OpenGaussReplicationToKafka {
Init(); Init();
// 删除槽 // 删除槽
// DeleteSlot(slotName); DeleteSlot(slotName);
// 创建槽 // 创建槽
//CreateSlot(slotName); CreateSlot(slotName);
// 查看槽 // 查看槽
//ListSlot(); //ListSlot();
//TruncateTable(); //TruncateTable();
//插入测试数据 //插入测试数据
//InsertTestData(); InsertTestData();
//获取最后的读取偏移位置 //获取最后的读取偏移位置
String lsn = GetRestartLsn(slotName); String lsn = GetRestartLsn(slotName);
@ -171,6 +172,9 @@ public class OpenGaussReplicationToKafka {
String res = new String(source, offset, length); String res = new String(source, offset, length);
if (res.equals("BEGIN")) continue; if (res.equals("BEGIN")) continue;
if (res.startsWith("COMMIT")) continue; if (res.startsWith("COMMIT")) continue;
JSONObject jo = JSONObject.parseObject(res);
jo.put("LSN", key);
res = jo.toString();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res); ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record); kafkaProducer.send(record);
System.out.println("send ok ==> " + res); System.out.println("send ok ==> " + res);

Loading…
Cancel
Save