diff --git a/src/main/java/UnitTest/OpenGaussReplication.java b/src/main/java/UnitTest/OpenGaussReplication.java index 6fb32b5..0084c2a 100644 --- a/src/main/java/UnitTest/OpenGaussReplication.java +++ b/src/main/java/UnitTest/OpenGaussReplication.java @@ -20,10 +20,12 @@ public class OpenGaussReplication { public static String PASSWD = "DsideaL147258369"; public static String TOPIC = "pg_test";//定义主题 - + /* + SELECT * FROM pg_create_logical_replication_slot('slot2', 'mppdb_decoding'); + insert into test(id,txt) values(10,'hhhh'); + SELECT * FROM pg_logical_slot_peek_changes('slot2', NULL, 4096); + */ public static final String BROKERS_ADDRESS = "10.10.14.67:9092"; -// public static final int REQUEST_REQUIRED_ACKS = 1; -// public static final String CLIENT_ID = "producer_test_id"; public static void main(String[] args) throws Exception { @@ -31,11 +33,8 @@ public class OpenGaussReplication { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - //props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS)); - //props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); KafkaProducer kafkaProducer = new KafkaProducer(props); - Properties properties = new Properties(); PGProperty.USER.set(properties, USER); PGProperty.PASSWORD.set(properties, PASSWD); @@ -58,8 +57,8 @@ public class OpenGaussReplication { .replicationStream() .logical() .withSlotName(slotName) - //.withSlotOption("include-xids", false) - // .withSlotOption("skip-empty-xacts", true) + .withSlotOption("include-xids", false) + .withSlotOption("skip-empty-xacts", true) .withStartPosition(waitLSN) .start();