From 44891fbbce246898c9abee667c4a51c823f5cf57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 31 May 2023 14:07:06 +0800 Subject: [PATCH] 'commit' --- .../UnitTest/OpenGaussReplicationToKafka.java | 136 ------------------ 1 file changed, 136 deletions(-) diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index c91214b..9e44687 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -41,147 +41,81 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; - public class OpenGaussReplicationToKafka { - - public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db"; - public static String USER = "postgres"; - public static String PASSWD = "DsideaL147258369"; - - public static String DRIVER_CLASS = "org.postgresql.Driver"; - public static String TOPIC = "pg_test";//定义主题 - - //KAFKA - public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; - - //REDIS - public static final String REDIS_HOST = "10.10.14.199"; - public static final int REDIS_PORT = 18890; - - public static final int EXPIRE_SECONDS = 3600 * 24 * 3; - - // 槽名 - public static final String slotName = "slot_huanghai"; - - public static void Init() { - //读取库 - DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS); - druid.start(); - - ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); - arp.setDialect(new PostgreSqlDialect()); - arp.start(); - - // 用于缓存模块的redis服务 - RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000); - redis.start(); - } public static void CreateSlot(String slotName) { - String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')"; - Db.find(sql, slotName); - } public static void DeleteSlot(String slotName) { - try { - String sql = "select pg_drop_replication_slot(?)"; - Db.find(sql, slotName); - } catch (Exception err) { - System.out.println(err); - } - } public static void ListSlot() { - String sql = "select * from pg_replication_slots"; - List list = Db.find(sql); - System.out.println(list); - } public static String GetRestartLsn(String slotName) { - String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; - List list = Db.find(sql, slotName); - if (list.size() > 0) return list.get(0).getStr("restart_lsn"); - return null; - } public static void InsertTestData() { - String sql = "select max(id) as maxid from test"; - int maxId = 0; - Record record = Db.findFirst(sql); - if (record.get("maxid") != null) maxId = record.getInt("maxid"); - maxId = maxId + 1; - sql = "insert into test(id,txt) values(?,?)"; - Db.update(sql, maxId, "黄海的测试数据:" + maxId); - } public static void TruncateTable() { - String sql = "truncate table test"; - Db.update(sql); - } - public static void main(String[] args) throws Exception { - //初始化数据库链接 Init(); // 删除槽 @@ -196,143 +130,73 @@ public class OpenGaussReplicationToKafka { //获取最后的读取偏移位置 String lsn = GetRestartLsn(slotName); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - KafkaProducer kafkaProducer = new KafkaProducer(props); - Properties properties = new Properties(); - PGProperty.USER.set(properties, USER); - PGProperty.PASSWORD.set(properties, PASSWD); - PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); - PGProperty.REPLICATION.set(properties, "database"); - PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); - - Class.forName(DRIVER_CLASS); - PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties); - System.out.println("connection success!"); - LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); - - PGReplicationStream stream = conn - .getReplicationAPI() - .replicationStream() - .logical() - .withSlotName(slotName) - .withStatusInterval(10, TimeUnit.SECONDS) - .withSlotOption("include-xids", false) - .withSlotOption("skip-empty-xacts", true) - .withStartPosition(waitLSN) - .start(); - System.out.println("本轮LSN起始位置:" + lsn); - - try { - while (true) { - ByteBuffer byteBuffer = stream.readPending(); - if (byteBuffer == null) { - TimeUnit.MILLISECONDS.sleep(10L); - continue; - } - LogSequenceNumber nowLsn = stream.getLastReceiveLSN(); - String key = "LSN_" + nowLsn.asLong(); - if (Redis.use().exists(key)) { - System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录,不再加入kafka!"); - stream.setAppliedLSN(nowLsn); - stream.setFlushedLSN(nowLsn); - continue; - } - - int offset = byteBuffer.arrayOffset(); - byte[] source = byteBuffer.array(); - int length = source.length - offset; - - String res = new String(source, offset, length); - if (res.equals("BEGIN")) continue; - if (res.startsWith("COMMIT")) continue; - JSONObject jo = JSONObject.parseObject(res); jo.put("LSN", key); res = jo.toString(); - ProducerRecord record = new ProducerRecord<>(TOPIC, res); - kafkaProducer.send(record); - System.out.println("send ok ==> " + res); - stream.setAppliedLSN(nowLsn); - stream.setFlushedLSN(nowLsn); - stream.forceUpdateStatus(); - - //write to redis - Redis.use().setex(key, EXPIRE_SECONDS, 1); - - } - } catch (Exception err) { - System.out.println(err); - } - } - }