diff --git a/dsAiSupport/pom.xml b/dsAiSupport/pom.xml index 61e9a91a..65306de0 100644 --- a/dsAiSupport/pom.xml +++ b/dsAiSupport/pom.xml @@ -43,6 +43,11 @@ java-jwt 4.4.0 + + org.apache.rocketmq + rocketmq-client-java + 5.0.7 + org.apache.httpcomponents httpmime diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java index 19da8b33..5f6f7429 100644 --- a/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java +++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/AiSupportApplication.java @@ -1,8 +1,12 @@ package com.dsideal.aiSupport; +import com.dsideal.aiSupport.Config.RocketMQConfig; import com.dsideal.aiSupport.Index.IndexController; import com.dsideal.aiSupport.Interceptor.*; import com.dsideal.aiSupport.Plugin.YamlProp; +import com.dsideal.aiSupport.Test.MessageProducer; +import com.dsideal.aiSupport.Test.MessagePushConsumer; +import com.dsideal.aiSupport.Test.MyMessageListener; import com.dsideal.aiSupport.Util.FileUtil; import com.dsideal.aiSupport.Util.LogBackLogFactory; import com.jfinal.config.*; @@ -126,6 +130,8 @@ public class AiSupportApplication extends JFinalConfig { public void configHandler(Handlers me) { } + private MessageProducer messageProducer; + private MessagePushConsumer messagePushConsumer; /** * 在jfinal启动完成后马上执行 */ @@ -135,5 +141,20 @@ public class AiSupportApplication extends JFinalConfig { String path = AiSupportApplication.class.getClassLoader().getResource("logo.txt").getPath(); File file = new File(path); System.out.println(FileUtil.txt2String(file)); + + // 初始化RocketMQ生产者 + messageProducer = new MessageProducer(); + + // 初始化RocketMQ消费者 + MyMessageListener messageListener = new MyMessageListener(); + messagePushConsumer = new MessagePushConsumer( + "YourConsumerGroup", + RocketMQConfig.getDefaultTopic(), + "*", // 订阅所有Tag + messageListener + ); + + // 将生产者注入到全局对象中,方便在Controller中使用 + //Jboot.setJbootObject(MessageProducer.class.getName(), messageProducer); } } diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java new file mode 100644 index 00000000..dd50de23 --- /dev/null +++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Config/RocketMQConfig.java @@ -0,0 +1,33 @@ +package com.dsideal.aiSupport.Config; + +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; +import org.apache.rocketmq.client.apis.ClientServiceProvider; + +public class RocketMQConfig { + // RocketMQ服务器地址 + private static final String ENDPOINT = "10.10.14.14:8081"; + // 默认主题 + private static final String DEFAULT_TOPIC = "TestTopic"; + + private static ClientServiceProvider provider; + private static ClientConfiguration configuration; + + static { + provider = ClientServiceProvider.loadService(); + ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT); + configuration = builder.build(); + } + + public static ClientServiceProvider getProvider() { + return provider; + } + + public static ClientConfiguration getConfiguration() { + return configuration; + } + + public static String getDefaultTopic() { + return DEFAULT_TOPIC; + } +} \ No newline at end of file diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java new file mode 100644 index 00000000..b2eac45e --- /dev/null +++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessageProducer.java @@ -0,0 +1,49 @@ +package com.dsideal.aiSupport.Test; + +import com.dsideal.aiSupport.Config.RocketMQConfig; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class MessageProducer { + private static final Logger logger = LoggerFactory.getLogger(MessageProducer.class); + private Producer producer; + + public MessageProducer() { + try { + producer = RocketMQConfig.getProvider().newProducerBuilder() + .setTopics(RocketMQConfig.getDefaultTopic()) + .setClientConfiguration(RocketMQConfig.getConfiguration()) + .build(); + } catch (ClientException e) { + logger.error("Failed to create producer", e); + } + } + + public void sendMessage(String topic, String tag, String key, String body) { + try { + Message message = RocketMQConfig.getProvider().newMessageBuilder() + .setTopic(topic) + .setKeys(key) + .setTag(tag) + .setBody(body.getBytes()) + .build(); + + SendReceipt sendReceipt = producer.send(message); + logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); + } catch (ClientException e) { + logger.error("Failed to send message", e); + } + } + + public void close() throws IOException { + if (producer != null) { + producer.close(); + } + } +} \ No newline at end of file diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java new file mode 100644 index 00000000..f0fb46b9 --- /dev/null +++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MessagePushConsumer.java @@ -0,0 +1,40 @@ +package com.dsideal.aiSupport.Test; + +import com.dsideal.aiSupport.Config.RocketMQConfig; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; + +public class MessagePushConsumer { + private static final Logger logger = LoggerFactory.getLogger(MessagePushConsumer.class); + private PushConsumer pushConsumer; + + public MessagePushConsumer(String consumerGroup, String topic, String tag, MessageListener messageListener) { + try { + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + pushConsumer = RocketMQConfig.getProvider().newPushConsumerBuilder() + .setClientConfiguration(RocketMQConfig.getConfiguration()) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageListener) + .build(); + + logger.info("Consumer started successfully"); + } catch (ClientException e) { + logger.error("Failed to create consumer", e); + } + } + + public void close() throws IOException { + if (pushConsumer != null) { + pushConsumer.close(); + } + } +} \ No newline at end of file diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java new file mode 100644 index 00000000..9dcfafda --- /dev/null +++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/MyMessageListener.java @@ -0,0 +1,32 @@ +package com.dsideal.aiSupport.Test; + +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MyMessageListener implements MessageListener { + private static final Logger logger = LoggerFactory.getLogger(MyMessageListener.class); + + @Override + public ConsumeResult consume(MessageView messageView) { + try { + String messageId = String.valueOf(messageView.getMessageId()); + String topic = messageView.getTopic(); + String tag = String.valueOf(messageView.getTag()); + byte[] body = messageView.getBody().array(); + String content = new String(body); + + logger.info("Received message: messageId={}, topic={}, tag={}, content={}", + messageId, topic, tag, content); + + // 处理消息的业务逻辑 + + return ConsumeResult.SUCCESS; + } catch (Exception e) { + logger.error("Failed to consume message", e); + return ConsumeResult.FAILURE; + } + } +} \ No newline at end of file