|
|
|
@ -62,6 +62,13 @@ public class OpenGaussReplicationToKafka {
|
|
|
|
|
System.out.println(list);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static String GetLsn(String slotName) {
|
|
|
|
|
String sql = "select confirmed_flush from pg_replication_slots where slot_name=?";
|
|
|
|
|
List<Record> list = Db.find(sql, slotName);
|
|
|
|
|
if (list.size() > 0) return list.get(0).getStr("confirmed_flush");
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static String GetRestartLsn(String slotName) {
|
|
|
|
|
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
|
|
|
|
|
List<Record> list = Db.find(sql, slotName);
|
|
|
|
@ -97,13 +104,15 @@ public class OpenGaussReplicationToKafka {
|
|
|
|
|
// 查看槽
|
|
|
|
|
//ListSlot();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TruncateTable();
|
|
|
|
|
|
|
|
|
|
//插入测试数据
|
|
|
|
|
// InsertTestData();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//获取最后的读取偏移位置
|
|
|
|
|
String lsn = GetRestartLsn(slotName);
|
|
|
|
|
String lsn = GetLsn(slotName);
|
|
|
|
|
|
|
|
|
|
Properties props = new Properties();
|
|
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
|
|
|
|
@ -154,9 +163,10 @@ public class OpenGaussReplicationToKafka {
|
|
|
|
|
kafkaProducer.send(record);
|
|
|
|
|
System.out.println("send ok ==> " + res);
|
|
|
|
|
//feedback
|
|
|
|
|
// stream.setAppliedLSN(stream.getLastReceiveLSN());
|
|
|
|
|
stream.setAppliedLSN(stream.getLastReceiveLSN());
|
|
|
|
|
stream.setFlushedLSN(stream.getLastReceiveLSN());
|
|
|
|
|
System.out.println(GetRestartLsn(slotName));
|
|
|
|
|
System.out.println(stream.getLastReceiveLSN());
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception err) {
|
|
|
|
|
if (stream != null) {
|
|
|
|
|