main
黄海 2 years ago
parent babdf8ade4
commit 16e9a7f0d8

@ -215,23 +215,7 @@
<artifactId>httpmime</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-ram</artifactId>
<version>3.3.0</version>
</dependency>
<!--阿里大鱼-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-dysmsapi -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dysmsapi</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
@ -265,27 +249,13 @@
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
<!--测试RocketMq-->
<!--https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client/4.9.2-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.opengauss/opengauss-jdbc -->
<dependency>
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
<version>5.0.0</version>
</dependency>
<!--引用postrgesql-->
<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<!-- <dependency>-->
<!-- <groupId>org.postgresql</groupId>-->
<!-- <artifactId>postgresql</artifactId>-->
<!-- <version>42.6.0</version>-->
<!-- </dependency>-->
<!--引入kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>

@ -1,7 +1,5 @@
package UnitTest;
import com.dsideal.FengHuang.Util.CommonUtil;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
@ -17,7 +15,8 @@ import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.*;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class OpenGaussReplicationToKafka {

@ -1,24 +1,20 @@
package UnitTest;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
/**
* @Date 2021-04-20 15:47
*/
public class RocketMqConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group");
consumer.setNamesrvAddr("10.10.14.231:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicXuanKe", "*");//订阅topic和tag设置
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
String body = new String(msgs.get(0).getBody());
System.out.printf("消息内容:%s%n", body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group");
// consumer.setNamesrvAddr("10.10.14.231:9876");
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// consumer.subscribe("TopicXuanKe", "*");//订阅topic和tag设置
// consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// String body = new String(msgs.get(0).getBody());
// System.out.printf("消息内容:%s%n", body);
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// });
// consumer.start();
}
}

@ -1,9 +1,4 @@
package UnitTest;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.UUID;
/**
* @Date 2021-04-20 15:40
@ -11,17 +6,17 @@ import java.util.UUID;
public class RocketMqProducer {
public static void main(String[] args) throws Exception {
//声明发送者
DefaultMQProducer producer = new DefaultMQProducer("default_producter_group");
producer.setNamesrvAddr("10.10.14.231:9876");
producer.start();
//发送
String body = "你好 RocketMQ";
Message message = new Message("TopicXuanKe", null, body.getBytes());
message.setKeys(UUID.randomUUID().toString());
SendResult sendResult = producer.send(message);
System.out.printf("发送结果:%s%n", sendResult);
//关闭生产者
producer.shutdown();
// //声明发送者
// DefaultMQProducer producer = new DefaultMQProducer("default_producter_group");
// producer.setNamesrvAddr("10.10.14.231:9876");
// producer.start();
// //发送
// String body = "你好 RocketMQ";
// Message message = new Message("TopicXuanKe", null, body.getBytes());
// message.setKeys(UUID.randomUUID().toString());
// SendResult sendResult = producer.send(message);
// System.out.printf("发送结果:%s%n", sendResult);
// //关闭生产者
// producer.shutdown();
}
}
Loading…
Cancel
Save