黄海 2 years ago
commit c176883273

@ -1,196 +1,362 @@
package UnitTest;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.activerecord.dialect.PostgreSqlDialect;
import com.jfinal.plugin.druid.DruidPlugin;
import com.jfinal.plugin.redis.Redis;
import com.jfinal.plugin.redis.RedisPlugin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class OpenGaussReplicationToKafka {
public static String SOURCE_URL = "jdbc:postgresql://10.10.14.61:15400/test_db";
public static String USER = "postgres";
public static String PASSWD = "DsideaL147258369";
public static String DRIVER_CLASS = "org.postgresql.Driver";
public static String TOPIC = "pg_test";//定义主题
//KAFKA
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
//REDIS
public static final String REDIS_HOST = "10.10.14.199";
public static final int REDIS_PORT = 18890;
public static final int EXPIRE_SECONDS = 3600 * 24 * 3;
// 槽名
public static final String slotName = "slot_huanghai";
public static void Init() {
//读取库
DruidPlugin druid = new DruidPlugin(SOURCE_URL, USER, PASSWD, DRIVER_CLASS);
druid.start();
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
arp.setDialect(new PostgreSqlDialect());
arp.start();
// 用于缓存模块的redis服务
RedisPlugin redis = new RedisPlugin("myRedis", REDIS_HOST, REDIS_PORT, 10 * 1000);
redis.start();
}
public static void CreateSlot(String slotName) {
String sql = "select * from pg_create_logical_replication_slot(?, 'mppdb_decoding')";
Db.find(sql, slotName);
}
public static void DeleteSlot(String slotName) {
try {
String sql = "select pg_drop_replication_slot(?)";
Db.find(sql, slotName);
} catch (Exception err) {
System.out.println(err);
}
}
public static void ListSlot() {
String sql = "select * from pg_replication_slots";
List<Record> list = Db.find(sql);
System.out.println(list);
}
public static String GetRestartLsn(String slotName) {
String sql = "select restart_lsn from pg_replication_slots where slot_name=?";
List<Record> list = Db.find(sql, slotName);
if (list.size() > 0) return list.get(0).getStr("restart_lsn");
return null;
}
public static void InsertTestData() {
String sql = "select max(id) as maxid from test";
int maxId = 0;
Record record = Db.findFirst(sql);
if (record.get("maxid") != null) maxId = record.getInt("maxid");
maxId = maxId + 1;
sql = "insert into test(id,txt) values(?,?)";
Db.update(sql, maxId, "黄海的测试数据:" + maxId);
}
public static void TruncateTable() {
String sql = "truncate table test";
Db.update(sql);
}
public static void main(String[] args) throws Exception {
//初始化数据库链接
Init();
// 删除槽
<<<<<<< HEAD
=======
>>>>>>> 352d297806f35f93e0a702f9cbebc494170d1916
// DeleteSlot(slotName);
// 创建槽
// CreateSlot(slotName);
<<<<<<< HEAD
=======
>>>>>>> 352d297806f35f93e0a702f9cbebc494170d1916
// 查看槽
//ListSlot();
// TruncateTable();
<<<<<<< HEAD
=======
>>>>>>> 352d297806f35f93e0a702f9cbebc494170d1916
//插入测试数据
// InsertTestData();
//获取最后的读取偏移位置
String lsn = GetRestartLsn(slotName);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
Properties properties = new Properties();
PGProperty.USER.set(properties, USER);
PGProperty.PASSWORD.set(properties, PASSWD);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
Class.forName(DRIVER_CLASS);
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCE_URL, properties);
System.out.println("connection success!");
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn);
PGReplicationStream stream = conn
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slotName)
.withStatusInterval(10, TimeUnit.SECONDS)
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(waitLSN)
.start();
System.out.println("本轮LSN起始位置" + lsn);
try {
while (true) {
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
LogSequenceNumber nowLsn = stream.getLastReceiveLSN();
String key = "LSN_" + nowLsn.asLong();
if (Redis.use().exists(key)) {
System.out.println("发现已经处理完成的LSN=" + key + ",将放过此记录不再加入kafka!");
stream.setAppliedLSN(nowLsn);
stream.setFlushedLSN(nowLsn);
continue;
}
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
String res = new String(source, offset, length);
if (res.equals("BEGIN")) continue;
if (res.startsWith("COMMIT")) continue;
JSONObject jo = JSONObject.parseObject(res);
jo.put("LSN", key);
res = jo.toString();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> " + res);
stream.setAppliedLSN(nowLsn);
stream.setFlushedLSN(nowLsn);
stream.forceUpdateStatus();
//write to redis
Redis.use().setex(key, EXPIRE_SECONDS, 1);
}
} catch (Exception err) {
System.out.println(err);
}
}
}

Loading…
Cancel
Save