main
kgdxpr 2 years ago
parent 65247210be
commit 3ec87d8ea7

@ -202,7 +202,7 @@
<div class="mui-card-header" style="font-size:18px;line-height: 1.8;padding-top: 25px;"><font style="color:#DD524D;">[{{:type_name}}]</font>&nbsp{{:content}}</div>
<div class="mui-card-content" style="font-size:16px;color:#576574">
<div class="mui-card-content-inner mui-input-group" style="background-color: #FFF5E1;padding-bottom: 15px;">
{{if type_id == 3}}
{{if type_id == 2}}
{{for xuanxiang}}
<div class="mui-input-row mui-checkbox mui-left">
<label>{{:key}}. {{:value}}</label>

@ -13,7 +13,7 @@
body {
background-image: url("../images/bg_img.png");
}
.title2_div {
text-align: center;
}
@ -57,7 +57,7 @@
.title1_p {
font-weight: 400;
font-style: normal;
font-size: 22px;
font-size: 20px;
color: #F4D7A1;
line-height: 30px;
word-wrap: break-word;
@ -213,8 +213,6 @@
<div class="input_div">
<button id='login' type="button" class="button">开始答题</button>
</div>
</div>
<div class="tam_div">

@ -64,7 +64,6 @@
<div class="mui-content-padded">
<button id='login' type="button" class="mui-btn mui-btn-block mui-btn-primary">开始答题</button>
</div>
@ -102,7 +101,6 @@
} else {
mui.alert(res.message, '提示');
}
},
error: function (xhr, type, errorThrown) {
//异常处理;

@ -202,7 +202,7 @@
{{/if}}
<span style="margin-left:50px">正确答案:<span style="color:#27ae60;">{{:answer}}</span></span>
</div>
<div class="mui-card-footer">解析:{{:memo}}</div>
<!-- <div class="mui-card-footer">解析:{{:memo}}</div> -->
</script>
</div>

@ -0,0 +1,170 @@
package UnitTest;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect;
import com.jfinal.plugin.druid.DruidPlugin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class OpenGaussReplicationToKafka {
public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db";
public static String USER = "postgres";
public static String PASSWD = "DsideaL147258369";
public static String DRIVER_CLASS = "org.postgresql.Driver";
public static String TOPIC = "pg_test";//定义主题
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
public static void Init() {
//读取库
DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
druid.start();
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
arp.setDialect(new PostgreSqlDialect());
arp.start();
}
public static void CreateSlot(String slotName) {
String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')";
Db.find(sql, slotName);
}
public static void DeleteSlot(String slotName) {
try {
String sql = "select pg_drop_replication_slot(?)";
Db.find(sql, slotName);
} catch (Exception err) {
System.out.println(err);
}
}
public static void ListSlot() {
String sql = "select * from pg_replication_slots";
List<Record> list = Db.find(sql);
System.out.println(list);
}
public static String GetRestartLsn(String slotName) {
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
List<Record> list = Db.find(sql, slotName);
if (list.size() > 0) return list.get(0).getStr("restart_lsn");
return null;
}
public static void InsertTestData() {
String sql = "select max(id) as maxid from test";
int maxId = 0;
Record record = Db.findFirst(sql);
if (record.get("maxid") != null) maxId = record.getInt("maxid");
maxId = maxId + 1;
sql = "insert into test(id,txt) values(?,?)";
Db.update(sql, maxId, "黄海的测试数据:" + maxId);
}
public static void TruncateTable() {
String sql = "truncate table test";
Db.update(sql);
}
public static void main(String[] args) throws Exception {
//初始化数据库链接
Init();
// 槽名
String slotName = "slot2";
// 删除槽
DeleteSlot(slotName);
// 创建槽
CreateSlot(slotName);
// 查看槽
//ListSlot();
//TruncateTable();
//插入测试数据
InsertTestData();
//获取最后的读取偏移位置
String lsn = GetRestartLsn(slotName);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
Properties properties = new Properties();
PGProperty.USER.set(properties, USER);
PGProperty.PASSWORD.set(properties, PASSWD);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
Class.forName(DRIVER_CLASS);
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties);
System.out.println("connection success!");
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
PGReplicationStream stream = conn
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slotName)
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(waitLSN)
.start();
System.out.println("本轮LSN起始位置" + lsn);
try {
while (true) {
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
String res = new String(source, offset, length);
if (res.equals("BEGIN")) continue;
if (res.startsWith("COMMIT")) continue;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> " + res);
//feedback
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
} catch (Exception err) {
if (stream != null) {
stream.close();
}
if (conn != null) {
conn.close();
}
}
}
}
Loading…
Cancel
Save