From 9bfa730dbd1bc58ac63cadeee133b991aa315aff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Tue, 30 May 2023 11:17:55 +0800 Subject: [PATCH] 'commit' --- .../java/UnitTest/OpenGaussReplicationToKafka.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 9703cd5..b8b4a25 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -62,10 +62,10 @@ public class OpenGaussReplicationToKafka { System.out.println(list); } - public static String GetRestartLsn(String slotName) { - String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; + public static String GetLsn(String slotName) { + String sql = "select confirmed_flush from pg_replication_slots where slot_name=?"; List list = Db.find(sql, slotName); - if (list.size() > 0) return list.get(0).getStr("restart_lsn"); + if (list.size() > 0) return list.get(0).getStr("confirmed_flush"); return null; } @@ -97,13 +97,13 @@ public class OpenGaussReplicationToKafka { // 查看槽 //ListSlot(); - TruncateTable(); + //TruncateTable(); //插入测试数据 - InsertTestData(); + //InsertTestData(); //获取最后的读取偏移位置 - String lsn = GetRestartLsn(slotName); + String lsn = GetLsn(slotName); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); @@ -156,6 +156,7 @@ public class OpenGaussReplicationToKafka { //feedback stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); + stream.forceUpdateStatus(); } } catch (Exception err) { if (stream != null) {