diff --git a/pom.xml b/pom.xml index f9e804d..7413ee3 100644 --- a/pom.xml +++ b/pom.xml @@ -136,13 +136,7 @@ 2.0.20 - - - - org.postgresql - postgresql - 42.6.0 - + @@ -278,18 +272,26 @@ rocketmq-client 4.9.4 + + + org.opengauss + opengauss-jdbc + 5.0.0 + + + + + + + + org.apache.kafka kafka-clients 3.4.0 - - org.apache.mina - mina-core - 2.1.3 - diff --git a/src/main/java/UnitTest/App.java b/src/main/java/UnitTest/OpenGaussReplication.java similarity index 91% rename from src/main/java/UnitTest/App.java rename to src/main/java/UnitTest/OpenGaussReplication.java index 4e156b5..41ea13c 100644 --- a/src/main/java/UnitTest/App.java +++ b/src/main/java/UnitTest/OpenGaussReplication.java @@ -13,7 +13,7 @@ import java.sql.DriverManager; import java.util.Properties; import java.util.concurrent.TimeUnit; -public class App { +public class OpenGaussReplication { public static String SOURCEURL = "jdbc:postgresql://10.10.14.209:5432/ccdjzswd_db"; public static String USER = "postgres"; @@ -31,8 +31,8 @@ public class App { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); -// props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS)); -// props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); + //props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS)); + //props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); KafkaProducer kafkaProducer = new KafkaProducer(props); @@ -45,7 +45,6 @@ public class App { Class.forName("org.postgresql.Driver"); - PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCEURL, properties); System.out.println("connection success!"); @@ -70,23 +69,20 @@ public class App { .start(); while (true) { - ByteBuffer byteBuffer = stream.readPending(); - if (byteBuffer == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } - int offset = byteBuffer.arrayOffset(); byte[] source = byteBuffer.array(); int length = source.length - offset; String res = new String(source, offset, length); - ProducerRecord record = new ProducerRecord(TOPIC, res); + ProducerRecord record = new ProducerRecord<>(TOPIC, res); kafkaProducer.send(record); - System.out.println("send ok ==> "+res); + System.out.println("send ok ==> " + res); //如果需要flush lsn,根据业务实际情况调用以下接口