diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 0691d55..1cfa1d5 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -62,6 +62,13 @@ public class OpenGaussReplicationToKafka { System.out.println(list); } + public static String GetLsn(String slotName) { + String sql = "select confirmed_flush from pg_replication_slots where slot_name=?"; + List list = Db.find(sql, slotName); + if (list.size() > 0) return list.get(0).getStr("confirmed_flush"); + return null; + } + public static String GetRestartLsn(String slotName) { String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; List list = Db.find(sql, slotName); @@ -97,13 +104,15 @@ public class OpenGaussReplicationToKafka { // 查看槽 //ListSlot(); + // TruncateTable(); //插入测试数据 // InsertTestData(); + //获取最后的读取偏移位置 - String lsn = GetRestartLsn(slotName); + String lsn = GetLsn(slotName); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); @@ -154,9 +163,10 @@ public class OpenGaussReplicationToKafka { kafkaProducer.send(record); System.out.println("send ok ==> " + res); //feedback -// stream.setAppliedLSN(stream.getLastReceiveLSN()); + stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); - System.out.println(GetRestartLsn(slotName)); + System.out.println(stream.getLastReceiveLSN()); + } } catch (Exception err) { if (stream != null) {