You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
6.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package UnitTest;
import com.dsideal.FengHuang.Util.CommonUtil;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect;
import com.jfinal.plugin.druid.DruidPlugin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class OpenGaussReplicationToKafka {
public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db";
public static String USER = "postgres";
public static String PASSWD = "DsideaL147258369";
public static String DRIVER_CLASS = "org.postgresql.Driver";
public static String TOPIC = "pg_test";//定义主题
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
public static void Init() {
//读取库
DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
druid.start();
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
arp.setDialect(new PostgreSqlDialect());
arp.start();
}
public static void CreateSlot(String slotName) {
String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')";
Db.find(sql, slotName);
}
public static void DeleteSlot(String slotName) {
try {
String sql = "select pg_drop_replication_slot(?)";
Db.find(sql, slotName);
} catch (Exception err) {
System.out.println(err);
}
}
public static void ListSlot() {
String sql = "select * from pg_replication_slots";
List<Record> list = Db.find(sql);
System.out.println(list);
}
public static String GetRestartLsn(String slotName) {
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
List<Record> list = Db.find(sql, slotName);
if (list.size() > 0) return list.get(0).getStr("restart_lsn");
return null;
}
public static void InsertTestData() {
String sql = "select max(id) as maxid from test";
int maxId = 0;
Record record = Db.findFirst(sql);
if (record.get("maxid") != null) maxId = record.getInt("maxid");
maxId = maxId + 1;
sql = "insert into test(id,txt) values(?,?)";
Db.update(sql, maxId, "黄海的测试数据:" + maxId);
}
public static void TruncateTable() {
String sql = "truncate table test";
Db.update(sql);
}
public static void main(String[] args) throws Exception {
//初始化数据库链接
Init();
// 槽名
String slotName = "slot2";
// 删除槽
DeleteSlot(slotName);
// 创建槽
CreateSlot(slotName);
// 查看槽
//ListSlot();
//TruncateTable();
//插入测试数据
InsertTestData();
//获取最后的读取偏移位置
String lsn = GetRestartLsn(slotName);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
Properties properties = new Properties();
PGProperty.USER.set(properties, USER);
PGProperty.PASSWORD.set(properties, PASSWD);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
Class.forName(DRIVER_CLASS);
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties);
System.out.println("connection success!");
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
PGReplicationStream stream = conn
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slotName)
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(waitLSN)
.start();
System.out.println("本轮LSN起始位置" + lsn);
try {
while (true) {
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
String res = new String(source, offset, length);
if (res.equals("BEGIN")) continue;
if (res.startsWith("COMMIT")) continue;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> " + res);
//feedback
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
} catch (Exception err) {
if (stream != null) {
stream.close();
}
if (conn != null) {
conn.close();
}
}
}
}