package UnitTest; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.postgresql.PGProperty; import org.postgresql.jdbc.PgConnection; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.DriverManager; import java.util.Properties; import java.util.concurrent.TimeUnit; public class OpenGaussReplication { public static String SOURCEURL = "jdbc:postgresql://10.10.14.61:15400/test_db"; public static String USER = "postgres"; public static String PASSWD = "DsideaL147258369"; public static String TOPIC = "pg_test";//定义主题 public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; // public static final int REQUEST_REQUIRED_ACKS = 1; // public static final String CLIENT_ID = "producer_test_id"; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS)); //props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); KafkaProducer kafkaProducer = new KafkaProducer(props); Properties properties = new Properties(); PGProperty.USER.set(properties, USER); PGProperty.PASSWORD.set(properties, PASSWD); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); PGProperty.REPLICATION.set(properties, "database"); PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); Class.forName("org.postgresql.Driver"); PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCEURL, properties); System.out.println("connection success!"); String slotName = "slot2"; String lsn = "403ED30"; LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); PGReplicationStream stream = conn .getReplicationAPI() .replicationStream() .logical() .withSlotName(slotName) //.withSlotOption("include-xids", false) // .withSlotOption("skip-empty-xacts", true) .withStartPosition(waitLSN) .start(); while (true) { ByteBuffer byteBuffer = stream.readPending(); if (byteBuffer == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = byteBuffer.arrayOffset(); byte[] source = byteBuffer.array(); int length = source.length - offset; String res = new String(source, offset, length); ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> " + res); //如果需要flush lsn,根据业务实际情况调用以下接口 LogSequenceNumber lastRecv = stream.getLastReceiveLSN(); System.out.println(lastRecv); // stream.setFlushedLSN(lastRecv); // stream.forceUpdateStatus(); } } }