main
黄海 2 years ago
parent 7db4551553
commit a305673283

@ -5,6 +5,8 @@ import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect;
import com.jfinal.plugin.druid.DruidPlugin;
import com.jfinal.plugin.redis.Redis;
import com.jfinal.plugin.redis.RedisPlugin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -28,8 +30,19 @@ public class OpenGaussReplicationToKafka {
public static String DRIVER_CLASS = "org.postgresql.Driver";
public static String TOPIC = "pg_test";//定义主题
//KAFKA
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
//REDIS
public static final String REDIS_HOST = "10.10.14.199";
public static final int REDIS_PORT = 18890;
public static final int EXPIRE_SECONDS = 3600 * 24 * 3;
// 槽名
public static final String slotName = "slot_huanghai";
public static void Init() {
//读取库
DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
@ -39,6 +52,9 @@ public class OpenGaussReplicationToKafka {
arp.setDialect(new PostgreSqlDialect());
arp.start();
// 用于缓存模块的redis服务
RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000);
redis.start();
}
public static void CreateSlot(String slotName) {
@ -53,7 +69,6 @@ public class OpenGaussReplicationToKafka {
} catch (Exception err) {
System.out.println(err);
}
}
public static void ListSlot() {
@ -62,13 +77,6 @@ public class OpenGaussReplicationToKafka {
System.out.println(list);
}
public static String GetLsn(String slotName) {
String sql = "select confirmed_flush from pg_replication_slots where slot_name=?";
List<Record> list = Db.find(sql, slotName);
if (list.size() > 0) return list.get(0).getStr("confirmed_flush");
return null;
}
public static String GetRestartLsn(String slotName) {
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
List<Record> list = Db.find(sql, slotName);
@ -95,12 +103,10 @@ public class OpenGaussReplicationToKafka {
//初始化数据库链接
Init();
// 槽名
String slotName = "slot2";
// 删除槽
DeleteSlot(slotName);
// DeleteSlot(slotName);
// 创建槽
CreateSlot(slotName);
//CreateSlot(slotName);
// 查看槽
//ListSlot();
@ -110,7 +116,7 @@ public class OpenGaussReplicationToKafka {
//InsertTestData();
//获取最后的读取偏移位置
String lsn = GetLsn(slotName);
String lsn = GetRestartLsn(slotName);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
@ -136,6 +142,7 @@ public class OpenGaussReplicationToKafka {
.replicationStream()
.logical()
.withSlotName(slotName)
.withStatusInterval(10, TimeUnit.SECONDS)
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(waitLSN)
@ -150,6 +157,13 @@ public class OpenGaussReplicationToKafka {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
LogSequenceNumber nowLsn = stream.getLastReceiveLSN();
String key = "LSN_" + nowLsn.asLong();
if (Redis.use().exists(key)) {
System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录不再加入kafka!");
continue;
}
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
@ -160,20 +174,11 @@ public class OpenGaussReplicationToKafka {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> " + res);
//feedback
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
stream.forceUpdateStatus();
System.out.println(stream.getLastReceiveLSN());
//write to redis
Redis.use().setex(key, EXPIRE_SECONDS, 1);
}
} catch (Exception err) {
if (stream != null) {
stream.close();
}
if (conn != null) {
conn.close();
}
System.out.println(err);
}
}
}

Loading…
Cancel
Save