|
|
|
@ -20,10 +20,12 @@ public class OpenGaussReplication {
|
|
|
|
|
public static String PASSWD = "DsideaL147258369";
|
|
|
|
|
|
|
|
|
|
public static String TOPIC = "pg_test";//定义主题
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
SELECT * FROM pg_create_logical_replication_slot('slot2', 'mppdb_decoding');
|
|
|
|
|
insert into test(id,txt) values(10,'hhhh');
|
|
|
|
|
SELECT * FROM pg_logical_slot_peek_changes('slot2', NULL, 4096);
|
|
|
|
|
*/
|
|
|
|
|
public static final String BROKERS_ADDRESS = "10.10.14.67:9092";
|
|
|
|
|
// public static final int REQUEST_REQUIRED_ACKS = 1;
|
|
|
|
|
// public static final String CLIENT_ID = "producer_test_id";
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
|
@ -31,11 +33,8 @@ public class OpenGaussReplication {
|
|
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
|
|
|
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
|
//props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));
|
|
|
|
|
//props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
|
|
|
|
|
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Properties properties = new Properties();
|
|
|
|
|
PGProperty.USER.set(properties, USER);
|
|
|
|
|
PGProperty.PASSWORD.set(properties, PASSWD);
|
|
|
|
@ -58,8 +57,8 @@ public class OpenGaussReplication {
|
|
|
|
|
.replicationStream()
|
|
|
|
|
.logical()
|
|
|
|
|
.withSlotName(slotName)
|
|
|
|
|
//.withSlotOption("include-xids", false)
|
|
|
|
|
// .withSlotOption("skip-empty-xacts", true)
|
|
|
|
|
.withSlotOption("include-xids", false)
|
|
|
|
|
.withSlotOption("skip-empty-xacts", true)
|
|
|
|
|
.withStartPosition(waitLSN)
|
|
|
|
|
.start();
|
|
|
|
|
|
|
|
|
|