From 11f5226fe694152cc9241111e33735abd354712e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com>
Date: Thu, 22 May 2025 09:43:49 +0800
Subject: [PATCH] 'commit'
---
dsAiSupport/pom.xml | 5 ++
.../aiSupport/AiSupportApplication.java | 21 ++++++++
.../aiSupport/Config/RocketMQConfig.java | 33 +++++++++++++
.../aiSupport/Test/MessageProducer.java | 49 +++++++++++++++++++
.../aiSupport/Test/MessagePushConsumer.java | 40 +++++++++++++++
.../aiSupport/Test/MyMessageListener.java | 32 ++++++++++++
6 files changed, 180 insertions(+)
create mode 100644 dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java
create mode 100644 dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java
create mode 100644 dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java
create mode 100644 dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java
diff --git a/dsAiSupport/pom.xml b/dsAiSupport/pom.xml
index 61e9a91a..65306de0 100644
--- a/dsAiSupport/pom.xml
+++ b/dsAiSupport/pom.xml
@@ -43,6 +43,11 @@
java-jwt
4.4.0
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ 5.0.7
+
org.apache.httpcomponents
httpmime
diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java
index 19da8b33..5f6f7429 100644
--- a/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java
+++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java
@@ -1,8 +1,12 @@
package com.dsideal.aiSupport;
+import com.dsideal.aiSupport.Config.RocketMQConfig;
import com.dsideal.aiSupport.Index.IndexController;
import com.dsideal.aiSupport.Interceptor.*;
import com.dsideal.aiSupport.Plugin.YamlProp;
+import com.dsideal.aiSupport.Test.MessageProducer;
+import com.dsideal.aiSupport.Test.MessagePushConsumer;
+import com.dsideal.aiSupport.Test.MyMessageListener;
import com.dsideal.aiSupport.Util.FileUtil;
import com.dsideal.aiSupport.Util.LogBackLogFactory;
import com.jfinal.config.*;
@@ -126,6 +130,8 @@ public class AiSupportApplication extends JFinalConfig {
public void configHandler(Handlers me) {
}
+ private MessageProducer messageProducer;
+ private MessagePushConsumer messagePushConsumer;
/**
* 在jfinal启动完成后马上执行
*/
@@ -135,5 +141,20 @@ public class AiSupportApplication extends JFinalConfig {
String path = AiSupportApplication.class.getClassLoader().getResource("logo.txt").getPath();
File file = new File(path);
System.out.println(FileUtil.txt2String(file));
+
+ // 初始化RocketMQ生产者
+ messageProducer = new MessageProducer();
+
+ // 初始化RocketMQ消费者
+ MyMessageListener messageListener = new MyMessageListener();
+ messagePushConsumer = new MessagePushConsumer(
+ "YourConsumerGroup",
+ RocketMQConfig.getDefaultTopic(),
+ "*", // 订阅所有Tag
+ messageListener
+ );
+
+ // 将生产者注入到全局对象中,方便在Controller中使用
+ //Jboot.setJbootObject(MessageProducer.class.getName(), messageProducer);
}
}
diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java
new file mode 100644
index 00000000..dd50de23
--- /dev/null
+++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java
@@ -0,0 +1,33 @@
+package com.dsideal.aiSupport.Config;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+
+public class RocketMQConfig {
+ // RocketMQ服务器地址
+ private static final String ENDPOINT = "10.10.14.14:8081";
+ // 默认主题
+ private static final String DEFAULT_TOPIC = "TestTopic";
+
+ private static ClientServiceProvider provider;
+ private static ClientConfiguration configuration;
+
+ static {
+ provider = ClientServiceProvider.loadService();
+ ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
+ configuration = builder.build();
+ }
+
+ public static ClientServiceProvider getProvider() {
+ return provider;
+ }
+
+ public static ClientConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public static String getDefaultTopic() {
+ return DEFAULT_TOPIC;
+ }
+}
\ No newline at end of file
diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java
new file mode 100644
index 00000000..b2eac45e
--- /dev/null
+++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java
@@ -0,0 +1,49 @@
+package com.dsideal.aiSupport.Test;
+
+import com.dsideal.aiSupport.Config.RocketMQConfig;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class MessageProducer {
+ private static final Logger logger = LoggerFactory.getLogger(MessageProducer.class);
+ private Producer producer;
+
+ public MessageProducer() {
+ try {
+ producer = RocketMQConfig.getProvider().newProducerBuilder()
+ .setTopics(RocketMQConfig.getDefaultTopic())
+ .setClientConfiguration(RocketMQConfig.getConfiguration())
+ .build();
+ } catch (ClientException e) {
+ logger.error("Failed to create producer", e);
+ }
+ }
+
+ public void sendMessage(String topic, String tag, String key, String body) {
+ try {
+ Message message = RocketMQConfig.getProvider().newMessageBuilder()
+ .setTopic(topic)
+ .setKeys(key)
+ .setTag(tag)
+ .setBody(body.getBytes())
+ .build();
+
+ SendReceipt sendReceipt = producer.send(message);
+ logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ } catch (ClientException e) {
+ logger.error("Failed to send message", e);
+ }
+ }
+
+ public void close() throws IOException {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java
new file mode 100644
index 00000000..f0fb46b9
--- /dev/null
+++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java
@@ -0,0 +1,40 @@
+package com.dsideal.aiSupport.Test;
+
+import com.dsideal.aiSupport.Config.RocketMQConfig;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class MessagePushConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(MessagePushConsumer.class);
+ private PushConsumer pushConsumer;
+
+ public MessagePushConsumer(String consumerGroup, String topic, String tag, MessageListener messageListener) {
+ try {
+ FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
+ pushConsumer = RocketMQConfig.getProvider().newPushConsumerBuilder()
+ .setClientConfiguration(RocketMQConfig.getConfiguration())
+ .setConsumerGroup(consumerGroup)
+ .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+ .setMessageListener(messageListener)
+ .build();
+
+ logger.info("Consumer started successfully");
+ } catch (ClientException e) {
+ logger.error("Failed to create consumer", e);
+ }
+ }
+
+ public void close() throws IOException {
+ if (pushConsumer != null) {
+ pushConsumer.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java
new file mode 100644
index 00000000..9dcfafda
--- /dev/null
+++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java
@@ -0,0 +1,32 @@
+package com.dsideal.aiSupport.Test;
+
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MyMessageListener implements MessageListener {
+ private static final Logger logger = LoggerFactory.getLogger(MyMessageListener.class);
+
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ try {
+ String messageId = String.valueOf(messageView.getMessageId());
+ String topic = messageView.getTopic();
+ String tag = String.valueOf(messageView.getTag());
+ byte[] body = messageView.getBody().array();
+ String content = new String(body);
+
+ logger.info("Received message: messageId={}, topic={}, tag={}, content={}",
+ messageId, topic, tag, content);
+
+ // 处理消息的业务逻辑
+
+ return ConsumeResult.SUCCESS;
+ } catch (Exception e) {
+ logger.error("Failed to consume message", e);
+ return ConsumeResult.FAILURE;
+ }
+ }
+}
\ No newline at end of file