main
黄海 2 years ago
parent 2397bf1b5c
commit 9bfa730dbd

@ -62,10 +62,10 @@ public class OpenGaussReplicationToKafka {
System.out.println(list); System.out.println(list);
} }
public static String GetRestartLsn(String slotName) { public static String GetLsn(String slotName) {
String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; String sql = "select confirmed_flush from pg_replication_slots where slot_name=?";
List<Record> list = Db.find(sql, slotName); List<Record> list = Db.find(sql, slotName);
if (list.size() > 0) return list.get(0).getStr("restart_lsn"); if (list.size() > 0) return list.get(0).getStr("confirmed_flush");
return null; return null;
} }
@ -97,13 +97,13 @@ public class OpenGaussReplicationToKafka {
// 查看槽 // 查看槽
//ListSlot(); //ListSlot();
TruncateTable(); //TruncateTable();
//插入测试数据 //插入测试数据
InsertTestData(); //InsertTestData();
//获取最后的读取偏移位置 //获取最后的读取偏移位置
String lsn = GetRestartLsn(slotName); String lsn = GetLsn(slotName);
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
@ -156,6 +156,7 @@ public class OpenGaussReplicationToKafka {
//feedback //feedback
stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN());
stream.forceUpdateStatus();
} }
} catch (Exception err) { } catch (Exception err) {
if (stream != null) { if (stream != null) {

Loading…
Cancel
Save