package UnitTest; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.postgresql.PGProperty; import org.postgresql.jdbc.PgConnection; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.DriverManager; import java.util.Properties; import java.util.concurrent.TimeUnit; public class App { public static String SOURCEURL = "jdbc:postgresql://10.10.14.209:5432/ccdjzswd_db"; public static String USER = "postgres"; public static String PASSWD = "DsideaL147258368"; public static String TOPIC = "pg_test";//定义主题 public static final String BROKERS_ADDRESS = "10.10.14.67:9094"; // public static final int REQUEST_REQUIRED_ACKS = 1; // public static final String CLIENT_ID = "producer_test_id"; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS)); // props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); KafkaProducer kafkaProducer = new KafkaProducer(props); Properties properties = new Properties(); PGProperty.USER.set(properties, USER); PGProperty.PASSWORD.set(properties, PASSWD); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); PGProperty.REPLICATION.set(properties, "database"); PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); Class.forName("org.postgresql.Driver"); PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCEURL, properties); System.out.println("connection success!"); String slotName = "replication_slot"; String lsn = "22DBF70"; LogSequenceNumber waitLSN = LogSequenceNumber.valueOf(lsn); PGReplicationStream stream = conn .getReplicationAPI() .replicationStream() .logical() .withSlotName(slotName) .withSlotOption("include-xids", false) .withSlotOption("skip-empty-xacts", true) .withStartPosition(waitLSN) // .withSlotOption("parallel-decode-num", 10) //解;解码线程并发度 // .withSlotOption("white-table-list", "public.logic_test") //白名单列表 // .withSlotOption("standby-connection", true) //强制备机解码 // .withSlotOption("decode-style", "t") //解码格式 // .withSlotOption("sending-bacth", 1) //批量发送解码结果 .start(); while (true) { ByteBuffer byteBuffer = stream.readPending(); if (byteBuffer == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = byteBuffer.arrayOffset(); byte[] source = byteBuffer.array(); int length = source.length - offset; String res = new String(source, offset, length); ProducerRecord record = new ProducerRecord(TOPIC, res); kafkaProducer.send(record); System.out.println("send ok ==> "+res); //如果需要flush lsn,根据业务实际情况调用以下接口 LogSequenceNumber lastRecv = stream.getLastReceiveLSN(); System.out.println(lastRecv); // stream.setFlushedLSN(lastRecv); // stream.forceUpdateStatus(); } } }