diff --git a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java index 1cfa1d5..cd5e279 100644 --- a/src/main/java/UnitTest/OpenGaussReplicationToKafka.java +++ b/src/main/java/UnitTest/OpenGaussReplicationToKafka.java @@ -5,6 +5,8 @@ import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect; import com.jfinal.plugin.druid.DruidPlugin; +import com.jfinal.plugin.redis.Redis; +import com.jfinal.plugin.redis.RedisPlugin; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -28,8 +30,19 @@ public class OpenGaussReplicationToKafka { public static String DRIVER_CLASS = "org.postgresql.Driver"; public static String TOPIC = "pg_test";//定义主题 + //KAFKA public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; + //REDIS + public static final String REDIS_HOST = "10.10.14.199"; + public static final int REDIS_PORT = 18890; + + public static final int EXPIRE_SECONDS = 3600 * 24 * 3; + + + // 槽名 + public static final String slotName = "slot_huanghai"; + public static void Init() { //读取库 DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS); @@ -39,6 +52,9 @@ public class OpenGaussReplicationToKafka { arp.setDialect(new PostgreSqlDialect()); arp.start(); + // 用于缓存模块的redis服务 + RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000); + redis.start(); } public static void CreateSlot(String slotName) { @@ -53,7 +69,6 @@ public class OpenGaussReplicationToKafka { } catch (Exception err) { System.out.println(err); } - } public static void ListSlot() { @@ -62,13 +77,6 @@ public class OpenGaussReplicationToKafka { System.out.println(list); } - public static String GetLsn(String slotName) { - String sql = "select confirmed_flush from pg_replication_slots where slot_name=?"; - List list = Db.find(sql, slotName); - if (list.size() > 0) return list.get(0).getStr("confirmed_flush"); - return null; - } - public static String GetRestartLsn(String slotName) { String sql = "select restart_lsn from pg_replication_slots where slot_name=?"; List list = Db.find(sql, slotName); @@ -95,12 +103,16 @@ public class OpenGaussReplicationToKafka { //初始化数据库链接 Init(); - // 槽名 - String slotName = "slot2"; // 删除槽 +<<<<<<< HEAD // DeleteSlot(slotName); // 创建槽 // CreateSlot(slotName); +======= + // DeleteSlot(slotName); + // 创建槽 + //CreateSlot(slotName); +>>>>>>> a305673283afe6bad2ca13126d63952d7ab891fe // 查看槽 //ListSlot(); @@ -112,7 +124,7 @@ public class OpenGaussReplicationToKafka { //获取最后的读取偏移位置 - String lsn = GetLsn(slotName); + String lsn = GetRestartLsn(slotName); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); @@ -138,6 +150,7 @@ public class OpenGaussReplicationToKafka { .replicationStream() .logical() .withSlotName(slotName) + .withStatusInterval(10, TimeUnit.SECONDS) .withSlotOption("include-xids", false) .withSlotOption("skip-empty-xacts", true) .withStartPosition(waitLSN) @@ -152,6 +165,13 @@ public class OpenGaussReplicationToKafka { TimeUnit.MILLISECONDS.sleep(10L); continue; } + LogSequenceNumber nowLsn = stream.getLastReceiveLSN(); + String key = "LSN_" + nowLsn.asLong(); + if (Redis.use().exists(key)) { + System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录,不再加入kafka!"); + continue; + } + int offset = byteBuffer.arrayOffset(); byte[] source = byteBuffer.array(); int length = source.length - offset; @@ -162,19 +182,19 @@ public class OpenGaussReplicationToKafka { ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> " + res); +<<<<<<< HEAD //feedback stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); System.out.println(stream.getLastReceiveLSN()); +======= + //write to redis + Redis.use().setex(key, EXPIRE_SECONDS, 1); +>>>>>>> a305673283afe6bad2ca13126d63952d7ab891fe } } catch (Exception err) { - if (stream != null) { - stream.close(); - } - if (conn != null) { - conn.close(); - } + System.out.println(err); } } }