From dc09314d83bc09983bc296d9b95936b9edfdc4d9 Mon Sep 17 00:00:00 2001 From: kgdxpr Date: Tue, 30 May 2023 15:00:49 +0800 Subject: [PATCH] 'commit' --- .../UnitTest/OpenGaussReplicationToKafka.java | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 0c6a823..6be7aab 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -1,197 +1,351 @@ package UnitTest; + import com.jfinal.plugin.activerecord.ActiveRecordPlugin; + import com.jfinal.plugin.activerecord.Db; + import com.jfinal.plugin.activerecord.Record; + import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect; + import com.jfinal.plugin.druid.DruidPlugin; + import com.jfinal.plugin.redis.Redis; + import com.jfinal.plugin.redis.RedisPlugin; + import org.apache.kafka.clients.producer.KafkaProducer; + import org.apache.kafka.clients.producer.ProducerConfig; + import org.apache.kafka.clients.producer.ProducerRecord; + import org.postgresql.PGProperty; + import org.postgresql.jdbc.PgConnection; + import org.postgresql.replication.LogSequenceNumber; + import org.postgresql.replication.PGReplicationStream; + import java.nio.ByteBuffer; + import java.sql.DriverManager; + import java.util.List; + import java.util.Properties; + import java.util.concurrent.TimeUnit; + public class OpenGaussReplicationToKafka { + public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db"; + public static String USER = "postgres"; + public static String PASSWD = "DsideaL147258369"; + public static String DRIVER_CLASS = "org.postgresql.Driver"; + public static String TOPIC = "pg_test";//定义主题 + //KAFKA + public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; + //REDIS + public static final String REDIS_HOST = "10.10.14.199"; + public static final int REDIS_PORT = 18890; + public static final int EXPIRE_SECONDS = 3600 * 24 * 3; // 槽名 + public static final String slotName = "slot_huanghai"; + public static void Init() { + //读取库 + DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS); + druid.start(); + ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); + arp.setDialect(new PostgreSqlDialect()); + arp.start(); + // 用于缓存模块的redis服务 + RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000); + redis.start(); + } + public static void CreateSlot(String slotName) { + String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')"; + Db.find(sql, slotName); + } + public static void DeleteSlot(String slotName) { + try { + String sql = "select pg_drop_replication_slot(?)"; + Db.find(sql, slotName); + } catch (Exception err) { + System.out.println(err); + } + } + public static void ListSlot() { + String sql = "select * from pg_replication_slots"; + List list = Db.find(sql); + System.out.println(list); + } + public static String GetRestartLsn(String slotName) { + String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; + List list = Db.find(sql, slotName); + if (list.size() > 0) return list.get(0).getStr("restart_lsn"); + return null; + } + public static void InsertTestData() { + String sql = "select max(id) as maxid from test"; + int maxId = 0; + Record record = Db.findFirst(sql); + if (record.get("maxid") != null) maxId = record.getInt("maxid"); + maxId = maxId + 1; + sql = "insert into test(id,txt) values(?,?)"; + Db.update(sql, maxId, "黄海的测试数据:" + maxId); + } + public static void TruncateTable() { + String sql = "truncate table test"; + Db.update(sql); + } + public static void main(String[] args) throws Exception { + //初始化数据库链接 + Init(); + // 删除槽 + // DeleteSlot(slotName); + // 创建槽 + // CreateSlot(slotName); + // 查看槽 + //ListSlot(); // TruncateTable(); + //插入测试数据 + // InsertTestData(); //获取最后的读取偏移位置 + String lsn = GetRestartLsn(slotName); + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(props); + Properties properties = new Properties(); + PGProperty.USER.set(properties, USER); + PGProperty.PASSWORD.set(properties, PASSWD); + PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); + PGProperty.REPLICATION.set(properties, "database"); + PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); + Class.forName(DRIVER_CLASS); + PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties); + System.out.println("connection success!"); + LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); + PGReplicationStream stream = conn + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(slotName) + .withStatusInterval(10, TimeUnit.SECONDS) + .withSlotOption("include-xids", false) + .withSlotOption("skip-empty-xacts", true) + .withStartPosition(waitLSN) + .start(); + System.out.println("本轮LSN起始位置:" + lsn); + try { + while (true) { + ByteBuffer byteBuffer = stream.readPending(); + if (byteBuffer == null) { + TimeUnit.MILLISECONDS.sleep(10L); + continue; + } + LogSequenceNumber nowLsn = stream.getLastReceiveLSN(); + String key = "LSN_" + nowLsn.asLong(); + if (Redis.use().exists(key)) { + System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录,不再加入kafka!"); + stream.setAppliedLSN(nowLsn); + stream.setFlushedLSN(nowLsn); + continue; + } + int offset = byteBuffer.arrayOffset(); + byte[] source = byteBuffer.array(); + int length = source.length - offset; + String res = new String(source, offset, length); + if (res.equals("BEGIN")) continue; + if (res.startsWith("COMMIT")) continue; + ProducerRecord record = new ProducerRecord<>(TOPIC, res); + kafkaProducer.send(record); + System.out.println("send ok ==> " + res); + stream.setAppliedLSN(nowLsn); + stream.setFlushedLSN(nowLsn); + stream.forceUpdateStatus(); + //write to redis + Redis.use().setex(key, EXPIRE_SECONDS, 1); + } + } catch (Exception err) { + System.out.println(err); + } + } + } + +