main
黄海 2 months ago
parent 2318ccace5
commit 11f5226fe6

@ -43,6 +43,11 @@
<artifactId>java-jwt</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>

@ -1,8 +1,12 @@
package com.dsideal.aiSupport;
import com.dsideal.aiSupport.Config.RocketMQConfig;
import com.dsideal.aiSupport.Index.IndexController;
import com.dsideal.aiSupport.Interceptor.*;
import com.dsideal.aiSupport.Plugin.YamlProp;
import com.dsideal.aiSupport.Test.MessageProducer;
import com.dsideal.aiSupport.Test.MessagePushConsumer;
import com.dsideal.aiSupport.Test.MyMessageListener;
import com.dsideal.aiSupport.Util.FileUtil;
import com.dsideal.aiSupport.Util.LogBackLogFactory;
import com.jfinal.config.*;
@ -126,6 +130,8 @@ public class AiSupportApplication extends JFinalConfig {
public void configHandler(Handlers me) {
}
private MessageProducer messageProducer;
private MessagePushConsumer messagePushConsumer;
/**
* jfinal
*/
@ -135,5 +141,20 @@ public class AiSupportApplication extends JFinalConfig {
String path = AiSupportApplication.class.getClassLoader().getResource("logo.txt").getPath();
File file = new File(path);
System.out.println(FileUtil.txt2String(file));
// 初始化RocketMQ生产者
messageProducer = new MessageProducer();
// 初始化RocketMQ消费者
MyMessageListener messageListener = new MyMessageListener();
messagePushConsumer = new MessagePushConsumer(
"YourConsumerGroup",
RocketMQConfig.getDefaultTopic(),
"*", // 订阅所有Tag
messageListener
);
// 将生产者注入到全局对象中方便在Controller中使用
//Jboot.setJbootObject(MessageProducer.class.getName(), messageProducer);
}
}

@ -0,0 +1,33 @@
package com.dsideal.aiSupport.Config;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
public class RocketMQConfig {
// RocketMQ服务器地址
private static final String ENDPOINT = "10.10.14.14:8081";
// 默认主题
private static final String DEFAULT_TOPIC = "TestTopic";
private static ClientServiceProvider provider;
private static ClientConfiguration configuration;
static {
provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
configuration = builder.build();
}
public static ClientServiceProvider getProvider() {
return provider;
}
public static ClientConfiguration getConfiguration() {
return configuration;
}
public static String getDefaultTopic() {
return DEFAULT_TOPIC;
}
}

@ -0,0 +1,49 @@
package com.dsideal.aiSupport.Test;
import com.dsideal.aiSupport.Config.RocketMQConfig;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class MessageProducer {
private static final Logger logger = LoggerFactory.getLogger(MessageProducer.class);
private Producer producer;
public MessageProducer() {
try {
producer = RocketMQConfig.getProvider().newProducerBuilder()
.setTopics(RocketMQConfig.getDefaultTopic())
.setClientConfiguration(RocketMQConfig.getConfiguration())
.build();
} catch (ClientException e) {
logger.error("Failed to create producer", e);
}
}
public void sendMessage(String topic, String tag, String key, String body) {
try {
Message message = RocketMQConfig.getProvider().newMessageBuilder()
.setTopic(topic)
.setKeys(key)
.setTag(tag)
.setBody(body.getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
}
public void close() throws IOException {
if (producer != null) {
producer.close();
}
}
}

@ -0,0 +1,40 @@
package com.dsideal.aiSupport.Test;
import com.dsideal.aiSupport.Config.RocketMQConfig;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class MessagePushConsumer {
private static final Logger logger = LoggerFactory.getLogger(MessagePushConsumer.class);
private PushConsumer pushConsumer;
public MessagePushConsumer(String consumerGroup, String topic, String tag, MessageListener messageListener) {
try {
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
pushConsumer = RocketMQConfig.getProvider().newPushConsumerBuilder()
.setClientConfiguration(RocketMQConfig.getConfiguration())
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageListener)
.build();
logger.info("Consumer started successfully");
} catch (ClientException e) {
logger.error("Failed to create consumer", e);
}
}
public void close() throws IOException {
if (pushConsumer != null) {
pushConsumer.close();
}
}
}

@ -0,0 +1,32 @@
package com.dsideal.aiSupport.Test;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyMessageListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(MyMessageListener.class);
@Override
public ConsumeResult consume(MessageView messageView) {
try {
String messageId = String.valueOf(messageView.getMessageId());
String topic = messageView.getTopic();
String tag = String.valueOf(messageView.getTag());
byte[] body = messageView.getBody().array();
String content = new String(body);
logger.info("Received message: messageId={}, topic={}, tag={}, content={}",
messageId, topic, tag, content);
// 处理消息的业务逻辑
return ConsumeResult.SUCCESS;
} catch (Exception e) {
logger.error("Failed to consume message", e);
return ConsumeResult.FAILURE;
}
}
}
Loading…
Cancel
Save