|
|
@ -41,147 +41,81 @@ import java.util.Properties;
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public class OpenGaussReplicationToKafka {
|
|
|
|
public class OpenGaussReplicationToKafka {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db";
|
|
|
|
public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db";
|
|
|
|
|
|
|
|
|
|
|
|
public static String USER = "postgres";
|
|
|
|
public static String USER = "postgres";
|
|
|
|
|
|
|
|
|
|
|
|
public static String PASSWD = "DsideaL147258369";
|
|
|
|
public static String PASSWD = "DsideaL147258369";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static String DRIVER_CLASS = "org.postgresql.Driver";
|
|
|
|
public static String DRIVER_CLASS = "org.postgresql.Driver";
|
|
|
|
|
|
|
|
|
|
|
|
public static String TOPIC = "pg_test";//定义主题
|
|
|
|
public static String TOPIC = "pg_test";//定义主题
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//KAFKA
|
|
|
|
//KAFKA
|
|
|
|
|
|
|
|
|
|
|
|
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
|
|
|
|
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//REDIS
|
|
|
|
//REDIS
|
|
|
|
|
|
|
|
|
|
|
|
public static final String REDIS_HOST = "10.10.14.199";
|
|
|
|
public static final String REDIS_HOST = "10.10.14.199";
|
|
|
|
|
|
|
|
|
|
|
|
public static final int REDIS_PORT = 18890;
|
|
|
|
public static final int REDIS_PORT = 18890;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static final int EXPIRE_SECONDS = 3600 * 24 * 3;
|
|
|
|
public static final int EXPIRE_SECONDS = 3600 * 24 * 3;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 槽名
|
|
|
|
// 槽名
|
|
|
|
|
|
|
|
|
|
|
|
public static final String slotName = "slot_huanghai";
|
|
|
|
public static final String slotName = "slot_huanghai";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void Init() {
|
|
|
|
public static void Init() {
|
|
|
|
|
|
|
|
|
|
|
|
//读取库
|
|
|
|
//读取库
|
|
|
|
|
|
|
|
|
|
|
|
DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
|
|
|
|
DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
|
|
|
|
|
|
|
|
|
|
|
|
druid.start();
|
|
|
|
druid.start();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
|
|
|
|
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
|
|
|
|
|
|
|
|
|
|
|
|
arp.setDialect(new PostgreSqlDialect());
|
|
|
|
arp.setDialect(new PostgreSqlDialect());
|
|
|
|
|
|
|
|
|
|
|
|
arp.start();
|
|
|
|
arp.start();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 用于缓存模块的redis服务
|
|
|
|
// 用于缓存模块的redis服务
|
|
|
|
|
|
|
|
|
|
|
|
RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000);
|
|
|
|
RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000);
|
|
|
|
|
|
|
|
|
|
|
|
redis.start();
|
|
|
|
redis.start();
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void CreateSlot(String slotName) {
|
|
|
|
public static void CreateSlot(String slotName) {
|
|
|
|
|
|
|
|
|
|
|
|
String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')";
|
|
|
|
String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')";
|
|
|
|
|
|
|
|
|
|
|
|
Db.find(sql, slotName);
|
|
|
|
Db.find(sql, slotName);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void DeleteSlot(String slotName) {
|
|
|
|
public static void DeleteSlot(String slotName) {
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
|
|
|
|
|
|
|
|
String sql = "select pg_drop_replication_slot(?)";
|
|
|
|
String sql = "select pg_drop_replication_slot(?)";
|
|
|
|
|
|
|
|
|
|
|
|
Db.find(sql, slotName);
|
|
|
|
Db.find(sql, slotName);
|
|
|
|
|
|
|
|
|
|
|
|
} catch (Exception err) {
|
|
|
|
} catch (Exception err) {
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println(err);
|
|
|
|
System.out.println(err);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void ListSlot() {
|
|
|
|
public static void ListSlot() {
|
|
|
|
|
|
|
|
|
|
|
|
String sql = "select * from pg_replication_slots";
|
|
|
|
String sql = "select * from pg_replication_slots";
|
|
|
|
|
|
|
|
|
|
|
|
List<Record> list = Db.find(sql);
|
|
|
|
List<Record> list = Db.find(sql);
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println(list);
|
|
|
|
System.out.println(list);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static String GetRestartLsn(String slotName) {
|
|
|
|
public static String GetRestartLsn(String slotName) {
|
|
|
|
|
|
|
|
|
|
|
|
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
|
|
|
|
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
|
|
|
|
|
|
|
|
|
|
|
|
List<Record> list = Db.find(sql, slotName);
|
|
|
|
List<Record> list = Db.find(sql, slotName);
|
|
|
|
|
|
|
|
|
|
|
|
if (list.size() > 0) return list.get(0).getStr("restart_lsn");
|
|
|
|
if (list.size() > 0) return list.get(0).getStr("restart_lsn");
|
|
|
|
|
|
|
|
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void InsertTestData() {
|
|
|
|
public static void InsertTestData() {
|
|
|
|
|
|
|
|
|
|
|
|
String sql = "select max(id) as maxid from test";
|
|
|
|
String sql = "select max(id) as maxid from test";
|
|
|
|
|
|
|
|
|
|
|
|
int maxId = 0;
|
|
|
|
int maxId = 0;
|
|
|
|
|
|
|
|
|
|
|
|
Record record = Db.findFirst(sql);
|
|
|
|
Record record = Db.findFirst(sql);
|
|
|
|
|
|
|
|
|
|
|
|
if (record.get("maxid") != null) maxId = record.getInt("maxid");
|
|
|
|
if (record.get("maxid") != null) maxId = record.getInt("maxid");
|
|
|
|
|
|
|
|
|
|
|
|
maxId = maxId + 1;
|
|
|
|
maxId = maxId + 1;
|
|
|
|
|
|
|
|
|
|
|
|
sql = "insert into test(id,txt) values(?,?)";
|
|
|
|
sql = "insert into test(id,txt) values(?,?)";
|
|
|
|
|
|
|
|
|
|
|
|
Db.update(sql, maxId, "黄海的测试数据:" + maxId);
|
|
|
|
Db.update(sql, maxId, "黄海的测试数据:" + maxId);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void TruncateTable() {
|
|
|
|
public static void TruncateTable() {
|
|
|
|
|
|
|
|
|
|
|
|
String sql = "truncate table test";
|
|
|
|
String sql = "truncate table test";
|
|
|
|
|
|
|
|
|
|
|
|
Db.update(sql);
|
|
|
|
Db.update(sql);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
|
|
|
|
//初始化数据库链接
|
|
|
|
//初始化数据库链接
|
|
|
|
Init();
|
|
|
|
Init();
|
|
|
|
// 删除槽
|
|
|
|
// 删除槽
|
|
|
@ -196,143 +130,73 @@ public class OpenGaussReplicationToKafka {
|
|
|
|
//获取最后的读取偏移位置
|
|
|
|
//获取最后的读取偏移位置
|
|
|
|
|
|
|
|
|
|
|
|
String lsn = GetRestartLsn(slotName);
|
|
|
|
String lsn = GetRestartLsn(slotName);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Properties props = new Properties();
|
|
|
|
Properties props = new Properties();
|
|
|
|
|
|
|
|
|
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
|
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
|
|
|
|
|
|
|
|
|
|
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
|
|
|
|
|
|
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
|
|
|
|
|
|
|
|
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
|
|
|
|
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Properties properties = new Properties();
|
|
|
|
Properties properties = new Properties();
|
|
|
|
|
|
|
|
|
|
|
|
PGProperty.USER.set(properties, USER);
|
|
|
|
PGProperty.USER.set(properties, USER);
|
|
|
|
|
|
|
|
|
|
|
|
PGProperty.PASSWORD.set(properties, PASSWD);
|
|
|
|
PGProperty.PASSWORD.set(properties, PASSWD);
|
|
|
|
|
|
|
|
|
|
|
|
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
|
|
|
|
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
|
|
|
|
|
|
|
|
|
|
|
|
PGProperty.REPLICATION.set(properties, "database");
|
|
|
|
PGProperty.REPLICATION.set(properties, "database");
|
|
|
|
|
|
|
|
|
|
|
|
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
|
|
|
|
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Class.forName(DRIVER_CLASS);
|
|
|
|
Class.forName(DRIVER_CLASS);
|
|
|
|
|
|
|
|
|
|
|
|
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties);
|
|
|
|
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties);
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println("connection success!");
|
|
|
|
System.out.println("connection success!");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
|
|
|
|
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PGReplicationStream stream = conn
|
|
|
|
PGReplicationStream stream = conn
|
|
|
|
|
|
|
|
|
|
|
|
.getReplicationAPI()
|
|
|
|
.getReplicationAPI()
|
|
|
|
|
|
|
|
|
|
|
|
.replicationStream()
|
|
|
|
.replicationStream()
|
|
|
|
|
|
|
|
|
|
|
|
.logical()
|
|
|
|
.logical()
|
|
|
|
|
|
|
|
|
|
|
|
.withSlotName(slotName)
|
|
|
|
.withSlotName(slotName)
|
|
|
|
|
|
|
|
|
|
|
|
.withStatusInterval(10, TimeUnit.SECONDS)
|
|
|
|
.withStatusInterval(10, TimeUnit.SECONDS)
|
|
|
|
|
|
|
|
|
|
|
|
.withSlotOption("include-xids", false)
|
|
|
|
.withSlotOption("include-xids", false)
|
|
|
|
|
|
|
|
|
|
|
|
.withSlotOption("skip-empty-xacts", true)
|
|
|
|
.withSlotOption("skip-empty-xacts", true)
|
|
|
|
|
|
|
|
|
|
|
|
.withStartPosition(waitLSN)
|
|
|
|
.withStartPosition(waitLSN)
|
|
|
|
|
|
|
|
|
|
|
|
.start();
|
|
|
|
.start();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println("本轮LSN起始位置:" + lsn);
|
|
|
|
System.out.println("本轮LSN起始位置:" + lsn);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
while (true) {
|
|
|
|
|
|
|
|
|
|
|
|
ByteBuffer byteBuffer = stream.readPending();
|
|
|
|
ByteBuffer byteBuffer = stream.readPending();
|
|
|
|
|
|
|
|
|
|
|
|
if (byteBuffer == null) {
|
|
|
|
if (byteBuffer == null) {
|
|
|
|
|
|
|
|
|
|
|
|
TimeUnit.MILLISECONDS.sleep(10L);
|
|
|
|
TimeUnit.MILLISECONDS.sleep(10L);
|
|
|
|
|
|
|
|
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
LogSequenceNumber nowLsn = stream.getLastReceiveLSN();
|
|
|
|
LogSequenceNumber nowLsn = stream.getLastReceiveLSN();
|
|
|
|
|
|
|
|
|
|
|
|
String key = "LSN_" + nowLsn.asLong();
|
|
|
|
String key = "LSN_" + nowLsn.asLong();
|
|
|
|
|
|
|
|
|
|
|
|
if (Redis.use().exists(key)) {
|
|
|
|
if (Redis.use().exists(key)) {
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录,不再加入kafka!");
|
|
|
|
System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录,不再加入kafka!");
|
|
|
|
|
|
|
|
|
|
|
|
stream.setAppliedLSN(nowLsn);
|
|
|
|
stream.setAppliedLSN(nowLsn);
|
|
|
|
|
|
|
|
|
|
|
|
stream.setFlushedLSN(nowLsn);
|
|
|
|
stream.setFlushedLSN(nowLsn);
|
|
|
|
|
|
|
|
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int offset = byteBuffer.arrayOffset();
|
|
|
|
int offset = byteBuffer.arrayOffset();
|
|
|
|
|
|
|
|
|
|
|
|
byte[] source = byteBuffer.array();
|
|
|
|
byte[] source = byteBuffer.array();
|
|
|
|
|
|
|
|
|
|
|
|
int length = source.length - offset;
|
|
|
|
int length = source.length - offset;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String res = new String(source, offset, length);
|
|
|
|
String res = new String(source, offset, length);
|
|
|
|
|
|
|
|
|
|
|
|
if (res.equals("BEGIN")) continue;
|
|
|
|
if (res.equals("BEGIN")) continue;
|
|
|
|
|
|
|
|
|
|
|
|
if (res.startsWith("COMMIT")) continue;
|
|
|
|
if (res.startsWith("COMMIT")) continue;
|
|
|
|
|
|
|
|
|
|
|
|
JSONObject jo = JSONObject.parseObject(res);
|
|
|
|
JSONObject jo = JSONObject.parseObject(res);
|
|
|
|
jo.put("LSN", key);
|
|
|
|
jo.put("LSN", key);
|
|
|
|
res = jo.toString();
|
|
|
|
res = jo.toString();
|
|
|
|
|
|
|
|
|
|
|
|
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
|
|
|
|
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
|
|
|
|
|
|
|
|
|
|
|
|
kafkaProducer.send(record);
|
|
|
|
kafkaProducer.send(record);
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println("send ok ==> " + res);
|
|
|
|
System.out.println("send ok ==> " + res);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream.setAppliedLSN(nowLsn);
|
|
|
|
stream.setAppliedLSN(nowLsn);
|
|
|
|
|
|
|
|
|
|
|
|
stream.setFlushedLSN(nowLsn);
|
|
|
|
stream.setFlushedLSN(nowLsn);
|
|
|
|
|
|
|
|
|
|
|
|
stream.forceUpdateStatus();
|
|
|
|
stream.forceUpdateStatus();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//write to redis
|
|
|
|
//write to redis
|
|
|
|
|
|
|
|
|
|
|
|
Redis.use().setex(key, EXPIRE_SECONDS, 1);
|
|
|
|
Redis.use().setex(key, EXPIRE_SECONDS, 1);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
} catch (Exception err) {
|
|
|
|
} catch (Exception err) {
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println(err);
|
|
|
|
System.out.println(err);
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|