From 3ec87d8ea7f4eb519324253cd8797e40d2536875 Mon Sep 17 00:00:00 2001 From: kgdxpr Date: Wed, 31 May 2023 13:15:49 +0800 Subject: [PATCH] 'commit' --- WebRoot/html/dangjian/view/exam.html | 2 +- WebRoot/html/dangjian/view/login.html | 6 +- WebRoot/html/dangjian/view/login_old.html | 2 - WebRoot/html/dangjian/view/result.html | 2 +- .../UnitTest/OpenGaussReplicationToKafka.java | 170 ++++++++++++++++++ 5 files changed, 174 insertions(+), 8 deletions(-) create mode 100644 src/main/java/UnitTest/OpenGaussReplicationToKafka.java diff --git a/WebRoot/html/dangjian/view/exam.html b/WebRoot/html/dangjian/view/exam.html index 289ecce..ebcd264 100644 --- a/WebRoot/html/dangjian/view/exam.html +++ b/WebRoot/html/dangjian/view/exam.html @@ -202,7 +202,7 @@
[{{:type_name}}] {{:content}}
- {{if type_id == 3}} + {{if type_id == 2}} {{for xuanxiang}}
diff --git a/WebRoot/html/dangjian/view/login.html b/WebRoot/html/dangjian/view/login.html index 739ed53..644737f 100644 --- a/WebRoot/html/dangjian/view/login.html +++ b/WebRoot/html/dangjian/view/login.html @@ -13,7 +13,7 @@ body { background-image: url("../images/bg_img.png"); } - + .title2_div { text-align: center; } @@ -57,7 +57,7 @@ .title1_p { font-weight: 400; font-style: normal; - font-size: 22px; + font-size: 20px; color: #F4D7A1; line-height: 30px; word-wrap: break-word; @@ -213,8 +213,6 @@
- -
diff --git a/WebRoot/html/dangjian/view/login_old.html b/WebRoot/html/dangjian/view/login_old.html index 72202fa..02ba5d1 100644 --- a/WebRoot/html/dangjian/view/login_old.html +++ b/WebRoot/html/dangjian/view/login_old.html @@ -64,7 +64,6 @@
-
@@ -102,7 +101,6 @@ } else { mui.alert(res.message, '提示'); } - }, error: function (xhr, type, errorThrown) { //异常处理; diff --git a/WebRoot/html/dangjian/view/result.html b/WebRoot/html/dangjian/view/result.html index 9f34838..505396e 100644 --- a/WebRoot/html/dangjian/view/result.html +++ b/WebRoot/html/dangjian/view/result.html @@ -202,7 +202,7 @@ {{/if}} 正确答案:{{:answer}}
- +
diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java new file mode 100644 index 0000000..6fb44d7 --- /dev/null +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -0,0 +1,170 @@ +package UnitTest; + +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect; +import com.jfinal.plugin.druid.DruidPlugin; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.postgresql.PGProperty; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.replication.LogSequenceNumber; +import org.postgresql.replication.PGReplicationStream; + +import java.nio.ByteBuffer; +import java.sql.DriverManager; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class OpenGaussReplicationToKafka { + + public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db"; + public static String USER = "postgres"; + public static String PASSWD = "DsideaL147258369"; + + public static String DRIVER_CLASS = "org.postgresql.Driver"; + public static String TOPIC = "pg_test";//定义主题 + + public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; + + public static void Init() { + //读取库 + DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS); + druid.start(); + + ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); + arp.setDialect(new PostgreSqlDialect()); + arp.start(); + + } + + public static void CreateSlot(String slotName) { + String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')"; + Db.find(sql, slotName); + } + + public static void DeleteSlot(String slotName) { + try { + String sql = "select pg_drop_replication_slot(?)"; + Db.find(sql, slotName); + } catch (Exception err) { + System.out.println(err); + } + + } + + public static void ListSlot() { + String sql = "select * from pg_replication_slots"; + List list = Db.find(sql); + System.out.println(list); + } + + public static String GetRestartLsn(String slotName) { + String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; + List list = Db.find(sql, slotName); + if (list.size() > 0) return list.get(0).getStr("restart_lsn"); + return null; + } + + public static void InsertTestData() { + String sql = "select max(id) as maxid from test"; + int maxId = 0; + Record record = Db.findFirst(sql); + if (record.get("maxid") != null) maxId = record.getInt("maxid"); + maxId = maxId + 1; + sql = "insert into test(id,txt) values(?,?)"; + Db.update(sql, maxId, "黄海的测试数据:" + maxId); + } + + public static void TruncateTable() { + String sql = "truncate table test"; + Db.update(sql); + } + + public static void main(String[] args) throws Exception { + //初始化数据库链接 + Init(); + + // 槽名 + String slotName = "slot2"; + // 删除槽 + DeleteSlot(slotName); + // 创建槽 + CreateSlot(slotName); + // 查看槽 + //ListSlot(); + + //TruncateTable(); + + //插入测试数据 + InsertTestData(); + + //获取最后的读取偏移位置 + String lsn = GetRestartLsn(slotName); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(props); + + Properties properties = new Properties(); + PGProperty.USER.set(properties, USER); + PGProperty.PASSWORD.set(properties, PASSWD); + PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); + PGProperty.REPLICATION.set(properties, "database"); + PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); + + Class.forName(DRIVER_CLASS); + PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties); + System.out.println("connection success!"); + + LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); + + PGReplicationStream stream = conn + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(slotName) + .withSlotOption("include-xids", false) + .withSlotOption("skip-empty-xacts", true) + .withStartPosition(waitLSN) + .start(); + + System.out.println("本轮LSN起始位置:" + lsn); + + try { + while (true) { + ByteBuffer byteBuffer = stream.readPending(); + if (byteBuffer == null) { + TimeUnit.MILLISECONDS.sleep(10L); + continue; + } + int offset = byteBuffer.arrayOffset(); + byte[] source = byteBuffer.array(); + int length = source.length - offset; + + String res = new String(source, offset, length); + if (res.equals("BEGIN")) continue; + if (res.startsWith("COMMIT")) continue; + ProducerRecord record = new ProducerRecord<>(TOPIC, res); + kafkaProducer.send(record); + System.out.println("send ok ==> " + res); + //feedback + stream.setAppliedLSN(stream.getLastReceiveLSN()); + stream.setFlushedLSN(stream.getLastReceiveLSN()); + } + } catch (Exception err) { + if (stream != null) { + stream.close(); + } + if (conn != null) { + conn.close(); + } + } + } +} +