diff --git a/src/main/java/UnitTest/OpenGaussReplication.java b/src/main/java/UnitTest/OpenGaussReplication.java deleted file mode 100644 index 0084c2a..0000000 --- a/src/main/java/UnitTest/OpenGaussReplication.java +++ /dev/null @@ -1,92 +0,0 @@ -package UnitTest; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.postgresql.PGProperty; -import org.postgresql.jdbc.PgConnection; -import org.postgresql.replication.LogSequenceNumber; -import org.postgresql.replication.PGReplicationStream; - -import java.nio.ByteBuffer; -import java.sql.DriverManager; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -public class OpenGaussReplication { - - public static String SOURCEURL = "jdbc:postgresql://10.10.14.61:15400/test_db"; - public static String USER = "postgres"; - public static String PASSWD = "DsideaL147258369"; - - public static String TOPIC = "pg_test";//定义主题 - /* - SELECT * FROM pg_create_logical_replication_slot('slot2', 'mppdb_decoding'); - insert into test(id,txt) values(10,'hhhh'); - SELECT * FROM pg_logical_slot_peek_changes('slot2', NULL, 4096); - */ - public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; - - public static void main(String[] args) throws Exception { - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - KafkaProducer kafkaProducer = new KafkaProducer(props); - - Properties properties = new Properties(); - PGProperty.USER.set(properties, USER); - PGProperty.PASSWORD.set(properties, PASSWD); - PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); - PGProperty.REPLICATION.set(properties, "database"); - PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); - - Class.forName("org.postgresql.Driver"); - - PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCEURL, properties); - System.out.println("connection success!"); - - String slotName = "slot2"; - String lsn = "403ED30"; - - LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); - - PGReplicationStream stream = conn - .getReplicationAPI() - .replicationStream() - .logical() - .withSlotName(slotName) - .withSlotOption("include-xids", false) - .withSlotOption("skip-empty-xacts", true) - .withStartPosition(waitLSN) - .start(); - - while (true) { - ByteBuffer byteBuffer = stream.readPending(); - if (byteBuffer == null) { - TimeUnit.MILLISECONDS.sleep(10L); - continue; - } - int offset = byteBuffer.arrayOffset(); - byte[] source = byteBuffer.array(); - int length = source.length - offset; - - String res = new String(source, offset, length); - - ProducerRecord record = new ProducerRecord<>(TOPIC, res); - kafkaProducer.send(record); - System.out.println("send ok ==> " + res); - - - //如果需要flush lsn,根据业务实际情况调用以下接口 - LogSequenceNumber lastRecv = stream.getLastReceiveLSN(); - System.out.println(lastRecv); -// stream.setFlushedLSN(lastRecv); -// stream.forceUpdateStatus(); - - } - } - -} - diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java new file mode 100644 index 0000000..6ef0fbe --- /dev/null +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -0,0 +1,171 @@ +package UnitTest; + +import com.dsideal.FengHuang.Util.CommonUtil; +import com.jfinal.kit.StrKit; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect; +import com.jfinal.plugin.druid.DruidPlugin; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.postgresql.PGProperty; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.replication.LogSequenceNumber; +import org.postgresql.replication.PGReplicationStream; + +import java.nio.ByteBuffer; +import java.sql.DriverManager; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class OpenGaussReplicationToKafka { + + public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db"; + public static String USER = "postgres"; + public static String PASSWD = "DsideaL147258369"; + + public static String DRIVER_CLASS = "org.postgresql.Driver"; + public static String TOPIC = "pg_test";//定义主题 + + public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; + + public static void Init() { + //读取库 + DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS); + druid.start(); + + ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); + arp.setDialect(new PostgreSqlDialect()); + arp.start(); + + } + + public static void CreateSlot(String slotName) { + String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')"; + Db.find(sql, slotName); + } + + public static void DeleteSlot(String slotName) { + try { + String sql = "select pg_drop_replication_slot(?)"; + Db.find(sql, slotName); + } catch (Exception err) { + System.out.println(err); + } + + } + + public static void ListSlot() { + String sql = "select * from pg_replication_slots"; + List list = Db.find(sql); + System.out.println(list); + } + + public static String GetRestartLsn(String slotName) { + String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; + List list = Db.find(sql, slotName); + if (list.size() > 0) return list.get(0).getStr("restart_lsn"); + return null; + } + + public static void InsertTestData() { + String sql = "select max(id) as maxid from test"; + int maxId = 0; + Record record = Db.findFirst(sql); + if (record.get("maxid") != null) maxId = record.getInt("maxid"); + maxId = maxId + 1; + sql = "insert into test(id,txt) values(?,?)"; + Db.update(sql, maxId, "黄海的测试数据:" + maxId); + } + + public static void TruncateTable() { + String sql = "truncate table test"; + Db.update(sql); + } + + public static void main(String[] args) throws Exception { + //初始化数据库链接 + Init(); + + // 槽名 + String slotName = "slot2"; + // 删除槽 + DeleteSlot(slotName); + // 创建槽 + CreateSlot(slotName); + // 查看槽 + //ListSlot(); + + //TruncateTable(); + + //插入测试数据 + InsertTestData(); + + //获取最后的读取偏移位置 + String lsn = GetRestartLsn(slotName); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(props); + + Properties properties = new Properties(); + PGProperty.USER.set(properties, USER); + PGProperty.PASSWORD.set(properties, PASSWD); + PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); + PGProperty.REPLICATION.set(properties, "database"); + PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); + + Class.forName(DRIVER_CLASS); + PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties); + System.out.println("connection success!"); + + LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); + + PGReplicationStream stream = conn + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(slotName) + .withSlotOption("include-xids", false) + .withSlotOption("skip-empty-xacts", true) + .withStartPosition(waitLSN) + .start(); + + System.out.println("本轮LSN起始位置:" + lsn); + + try { + while (true) { + ByteBuffer byteBuffer = stream.readPending(); + if (byteBuffer == null) { + TimeUnit.MILLISECONDS.sleep(10L); + continue; + } + int offset = byteBuffer.arrayOffset(); + byte[] source = byteBuffer.array(); + int length = source.length - offset; + + String res = new String(source, offset, length); + if (res.equals("BEGIN")) continue; + if (res.startsWith("COMMIT")) continue; + ProducerRecord record = new ProducerRecord<>(TOPIC, res); + kafkaProducer.send(record); + System.out.println("send ok ==> " + res); + //feedback + stream.setAppliedLSN(stream.getLastReceiveLSN()); + stream.setFlushedLSN(stream.getLastReceiveLSN()); + } + } catch (Exception err) { + if (stream != null) { + stream.close(); + } + if (conn != null) { + conn.close(); + } + } + } +} +