diff --git a/WebRoot/html/dangjian/view/exam.html b/WebRoot/html/dangjian/view/exam.html
index 289ecce..ebcd264 100644
--- a/WebRoot/html/dangjian/view/exam.html
+++ b/WebRoot/html/dangjian/view/exam.html
@@ -202,7 +202,7 @@
- {{if type_id == 3}}
+ {{if type_id == 2}}
{{for xuanxiang}}
diff --git a/WebRoot/html/dangjian/view/login_old.html b/WebRoot/html/dangjian/view/login_old.html
index 72202fa..02ba5d1 100644
--- a/WebRoot/html/dangjian/view/login_old.html
+++ b/WebRoot/html/dangjian/view/login_old.html
@@ -64,7 +64,6 @@
-
@@ -102,7 +101,6 @@
} else {
mui.alert(res.message, '提示');
}
-
},
error: function (xhr, type, errorThrown) {
//异常处理;
diff --git a/WebRoot/html/dangjian/view/result.html b/WebRoot/html/dangjian/view/result.html
index 9f34838..505396e 100644
--- a/WebRoot/html/dangjian/view/result.html
+++ b/WebRoot/html/dangjian/view/result.html
@@ -202,7 +202,7 @@
{{/if}}
正确答案:{{:answer}}
-
+
diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java
new file mode 100644
index 0000000..6fb44d7
--- /dev/null
+++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java
@@ -0,0 +1,170 @@
+package UnitTest;
+
+import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.Record;
+import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect;
+import com.jfinal.plugin.druid.DruidPlugin;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.postgresql.PGProperty;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationStream;
+
+import java.nio.ByteBuffer;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class OpenGaussReplicationToKafka {
+
+ public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db";
+ public static String USER = "postgres";
+ public static String PASSWD = "DsideaL147258369";
+
+ public static String DRIVER_CLASS = "org.postgresql.Driver";
+ public static String TOPIC = "pg_test";//定义主题
+
+ public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
+
+ public static void Init() {
+ //读取库
+ DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
+ druid.start();
+
+ ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
+ arp.setDialect(new PostgreSqlDialect());
+ arp.start();
+
+ }
+
+ public static void CreateSlot(String slotName) {
+ String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')";
+ Db.find(sql, slotName);
+ }
+
+ public static void DeleteSlot(String slotName) {
+ try {
+ String sql = "select pg_drop_replication_slot(?)";
+ Db.find(sql, slotName);
+ } catch (Exception err) {
+ System.out.println(err);
+ }
+
+ }
+
+ public static void ListSlot() {
+ String sql = "select * from pg_replication_slots";
+ List
list = Db.find(sql);
+ System.out.println(list);
+ }
+
+ public static String GetRestartLsn(String slotName) {
+ String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
+ List list = Db.find(sql, slotName);
+ if (list.size() > 0) return list.get(0).getStr("restart_lsn");
+ return null;
+ }
+
+ public static void InsertTestData() {
+ String sql = "select max(id) as maxid from test";
+ int maxId = 0;
+ Record record = Db.findFirst(sql);
+ if (record.get("maxid") != null) maxId = record.getInt("maxid");
+ maxId = maxId + 1;
+ sql = "insert into test(id,txt) values(?,?)";
+ Db.update(sql, maxId, "黄海的测试数据:" + maxId);
+ }
+
+ public static void TruncateTable() {
+ String sql = "truncate table test";
+ Db.update(sql);
+ }
+
+ public static void main(String[] args) throws Exception {
+ //初始化数据库链接
+ Init();
+
+ // 槽名
+ String slotName = "slot2";
+ // 删除槽
+ DeleteSlot(slotName);
+ // 创建槽
+ CreateSlot(slotName);
+ // 查看槽
+ //ListSlot();
+
+ //TruncateTable();
+
+ //插入测试数据
+ InsertTestData();
+
+ //获取最后的读取偏移位置
+ String lsn = GetRestartLsn(slotName);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ KafkaProducer kafkaProducer = new KafkaProducer(props);
+
+ Properties properties = new Properties();
+ PGProperty.USER.set(properties, USER);
+ PGProperty.PASSWORD.set(properties, PASSWD);
+ PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
+ PGProperty.REPLICATION.set(properties, "database");
+ PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
+
+ Class.forName(DRIVER_CLASS);
+ PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties);
+ System.out.println("connection success!");
+
+ LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
+
+ PGReplicationStream stream = conn
+ .getReplicationAPI()
+ .replicationStream()
+ .logical()
+ .withSlotName(slotName)
+ .withSlotOption("include-xids", false)
+ .withSlotOption("skip-empty-xacts", true)
+ .withStartPosition(waitLSN)
+ .start();
+
+ System.out.println("本轮LSN起始位置:" + lsn);
+
+ try {
+ while (true) {
+ ByteBuffer byteBuffer = stream.readPending();
+ if (byteBuffer == null) {
+ TimeUnit.MILLISECONDS.sleep(10L);
+ continue;
+ }
+ int offset = byteBuffer.arrayOffset();
+ byte[] source = byteBuffer.array();
+ int length = source.length - offset;
+
+ String res = new String(source, offset, length);
+ if (res.equals("BEGIN")) continue;
+ if (res.startsWith("COMMIT")) continue;
+ ProducerRecord record = new ProducerRecord<>(TOPIC, res);
+ kafkaProducer.send(record);
+ System.out.println("send ok ==> " + res);
+ //feedback
+ stream.setAppliedLSN(stream.getLastReceiveLSN());
+ stream.setFlushedLSN(stream.getLastReceiveLSN());
+ }
+ } catch (Exception err) {
+ if (stream != null) {
+ stream.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+}
+