main
黄海 2 years ago
parent abe34ba824
commit 3e86d59153

@ -136,13 +136,7 @@
<version>2.0.20</version>
</dependency>
<!--引用postrgesql-->
<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
<!--引用poi-->
<dependency>
@ -278,18 +272,26 @@
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.opengauss/opengauss-jdbc -->
<dependency>
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
<version>5.0.0</version>
</dependency>
<!--引用postrgesql-->
<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<!-- <dependency>-->
<!-- <groupId>org.postgresql</groupId>-->
<!-- <artifactId>postgresql</artifactId>-->
<!-- <version>42.6.0</version>-->
<!-- </dependency>-->
<!--引入kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>

@ -13,7 +13,7 @@ import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class App {
public class OpenGaussReplication {
public static String SOURCEURL = "jdbc:postgresql://10.10.14.209:5432/ccdjzswd_db";
public static String USER = "postgres";
@ -31,8 +31,8 @@ public class App {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));
// props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
//props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));
//props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
@ -45,7 +45,6 @@ public class App {
Class.forName("org.postgresql.Driver");
PgConnection conn = (PgConnection) DriverManager.getConnection(SOURCEURL, properties);
System.out.println("connection success!");
@ -70,23 +69,20 @@ public class App {
.start();
while (true) {
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
String res = new String(source, offset, length);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, res);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, res);
kafkaProducer.send(record);
System.out.println("send ok ==> "+res);
System.out.println("send ok ==> " + res);
//如果需要flush lsn根据业务实际情况调用以下接口
Loading…
Cancel
Save