You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.dsideal.Ai.Util;
import com.dsideal.Config.PropKit;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* RocketMQ工具类提供消息发送和消费的常用方法
* 包含同步发送、异步发送、顺序消息发送和消息消费等功能
*/
public class RocketMqKit {
private static final Logger logger = LoggerFactory.getLogger(RocketMqKit.class);
private static volatile DefaultMQProducer producer;
private static volatile DefaultLitePullConsumer consumer;
private static final Object producerLock = new Object();
private static final Object consumerLock = new Object();
// RocketMQ服务器地址
private final static String nameServer = PropKit.get("RocketMq.nameServer");
// 生产者组名
private final static String producerGroup = "my_group";
// 消费者组名
private final static String consumerGroup = "my_group";
// 主题名称
private final static String topic = PropKit.get("RocketMq.topic");
// 获取Producer实例
public static DefaultMQProducer getProducer() {
if (producer == null) {
synchronized (producerLock) {
if (producer == null) {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
try {
producer.start();
} catch (Exception e) {
throw new RuntimeException("初始化Producer失败", e);
}
}
}
}
return producer;
}
// 获取Consumer实例
public static DefaultLitePullConsumer getConsumer() {
if (consumer == null) {
synchronized (consumerLock) {
if (consumer == null) {
consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.setAutoCommit(false);
try {
consumer.subscribe(topic, "*");
consumer.setPullBatchSize(20);
consumer.start();
} catch (Exception e) {
throw new RuntimeException("初始化Consumer失败", e);
}
}
}
}
return consumer;
}
// 关闭资源的方法
public static void shutdown() {
if (producer != null) {
producer.shutdown();
}
if (consumer != null) {
consumer.shutdown();
}
}
// 发送消息的方法
public static SendResult sendMessage(String tag, String key, String body) throws Exception {
Message msg = new Message(topic, tag, key, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
return getProducer().send(msg);
}
// 消费消息的方法
public static List<MessageExt> pullMessages() {
return getConsumer().poll();
}
/**
* 同步发送消息
* 发送完消息后等待服务器响应结果
*/
public static void syncSend() {
DefaultMQProducer producer = null;
try {
// 初始化生产者
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.start();
// 创建消息对象
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并等待结果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("同步发送结果:%s%n", sendResult);
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
// 确保producer被正确关闭
if (producer != null) {
producer.shutdown();
}
}
}
/**
* 异步发送消息
* 发送后立即返回,通过回调获取结果
*/
public static void asyncSend() throws IOException, InterruptedException {
DefaultMQProducer producer = null;
try {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.start();
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送,通过回调处理结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功消息ID%s%n", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("异步发送异常:%s%n", e.getMessage());
logger.error(e.getMessage());
}
}, 10000);
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
// 注意异步发送时不能立即关闭producer需要等待回调完成
// 实际应用中应该有更好的生命周期管理
Thread.sleep(1000); // 等待回调
if (producer != null) {
producer.shutdown();
}
}
}
/**
* 发送顺序消息
* 通过MessageQueueSelector确保同一个orderId的消息进入同一个队列
*/
public static void orderSend() {
DefaultMQProducer producer = null;
try {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
// 发送10条消息相同orderId的消息会进入同一个队列
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg = new Message(topic,
tags[i % tags.length],
"KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg,
(List<MessageQueue> mqs, Message msg1, Object arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
System.out.printf("顺序消息发送结果:%s%n", sendResult);
}
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
if (producer != null) {
producer.shutdown();
}
}
}
/**
* 消息消费者
* 使用拉模式消费消息
*/
public static void consumerPull() {
DefaultLitePullConsumer consumer = null;
try {
consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.setAutoCommit(false); // 手动提交消费位点
consumer.subscribe(topic, "*"); // 订阅所有Tag
consumer.setPullBatchSize(20); // 设置批量拉取大小
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (!messageExts.isEmpty()) {
for (MessageExt msg : messageExts) {
String content = new String(msg.getBody());
System.out.printf("消费消息:%sTag%s%n", content, msg.getTags());
}
} else {
System.out.println("本轮没有找到需要消费的消息!");
}
consumer.commit();
}
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
if (consumer != null) {
consumer.shutdown();
}
}
}
public static void main(String[] args) {
// 模拟发送
logger.info("当前服务器:" + nameServer);
try {
// 测试同步发送
logger.info("=== 测试同步发送 ===");
syncSend();
Thread.sleep(1000);
// 测试异步发送
logger.info("\n=== 测试异步发送 ===");
asyncSend();
Thread.sleep(1000);
// 测试顺序消息发送
logger.info("\n=== 测试顺序消息发送 ===");
orderSend();
Thread.sleep(1000);
// 测试消费者拉取消息
logger.info("\n=== 测试消息消费 ===");
// 注意这是一个无限循环的方法这里只运行5秒钟作为演示
Thread thread = new Thread(() -> {
try {
consumerPull();
} catch (Exception e) {
logger.error(e.getMessage());
}
});
thread.start();
Thread.sleep(5000);
shutdown(); // 关闭资源
System.exit(0);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}