package UnitTest; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect; import com.jfinal.plugin.druid.DruidPlugin; import com.jfinal.plugin.redis.Redis; import com.jfinal.plugin.redis.RedisPlugin; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.postgresql.PGProperty; import org.postgresql.jdbc.PgConnection; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.DriverManager; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; public class OpenGaussReplicationToKafka { public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db"; public static String USER = "postgres"; public static String PASSWD = "DsideaL147258369"; public static String DRIVER_CLASS = "org.postgresql.Driver"; public static String TOPIC = "pg_test";//定义主题 //KAFKA public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; //REDIS public static final String REDIS_HOST = "10.10.14.199"; public static final int REDIS_PORT = 18890; public static final int EXPIRE_SECONDS = 3600 * 24 * 3; // 槽名 public static final String slotName = "slot_huanghai"; public static void Init() { //读取库 DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS); druid.start(); ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); arp.setDialect(new PostgreSqlDialect()); arp.start(); // 用于缓存模块的redis服务 RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000); redis.start(); } public static void CreateSlot(String slotName) { String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')"; Db.find(sql, slotName); } public static void DeleteSlot(String slotName) { try { String sql = "select pg_drop_replication_slot(?)"; Db.find(sql, slotName); } catch (Exception err) { System.out.println(err); } } public static void ListSlot() { String sql = "select * from pg_replication_slots"; List list = Db.find(sql); System.out.println(list); } public static String GetRestartLsn(String slotName) { String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; List list = Db.find(sql, slotName); if (list.size() > 0) return list.get(0).getStr("restart_lsn"); return null; } public static void InsertTestData() { String sql = "select max(id) as maxid from test"; int maxId = 0; Record record = Db.findFirst(sql); if (record.get("maxid") != null) maxId = record.getInt("maxid"); maxId = maxId + 1; sql = "insert into test(id,txt) values(?,?)"; Db.update(sql, maxId, "黄海的测试数据:" + maxId); } public static void TruncateTable() { String sql = "truncate table test"; Db.update(sql); } public static void main(String[] args) throws Exception { //初始化数据库链接 Init(); // 删除槽 <<<<<<< HEAD // DeleteSlot(slotName); // 创建槽 // CreateSlot(slotName); ======= // DeleteSlot(slotName); // 创建槽 //CreateSlot(slotName); >>>>>>> a305673283afe6bad2ca13126d63952d7ab891fe // 查看槽 //ListSlot(); // TruncateTable(); //插入测试数据 // InsertTestData(); //获取最后的读取偏移位置 String lsn = GetRestartLsn(slotName); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(props); Properties properties = new Properties(); PGProperty.USER.set(properties, USER); PGProperty.PASSWORD.set(properties, PASSWD); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); PGProperty.REPLICATION.set(properties, "database"); PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); Class.forName(DRIVER_CLASS); PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties); System.out.println("connection success!"); LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); PGReplicationStream stream = conn .getReplicationAPI() .replicationStream() .logical() .withSlotName(slotName) .withStatusInterval(10, TimeUnit.SECONDS) .withSlotOption("include-xids", false) .withSlotOption("skip-empty-xacts", true) .withStartPosition(waitLSN) .start(); System.out.println("本轮LSN起始位置:" + lsn); try { while (true) { ByteBuffer byteBuffer = stream.readPending(); if (byteBuffer == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } LogSequenceNumber nowLsn = stream.getLastReceiveLSN(); String key = "LSN_" + nowLsn.asLong(); if (Redis.use().exists(key)) { System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录,不再加入kafka!"); continue; } int offset = byteBuffer.arrayOffset(); byte[] source = byteBuffer.array(); int length = source.length - offset; String res = new String(source, offset, length); if (res.equals("BEGIN")) continue; if (res.startsWith("COMMIT")) continue; ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> " + res); <<<<<<< HEAD //feedback stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); System.out.println(stream.getLastReceiveLSN()); ======= //write to redis Redis.use().setex(key, EXPIRE_SECONDS, 1); >>>>>>> a305673283afe6bad2ca13126d63952d7ab891fe } } catch (Exception err) { System.out.println(err); } } }