diff --git a/dsAiSupport/Doc/5、安装RocketMQ.md b/dsAiSupport/Doc/5、安装RocketMQ.md index fc1d82ca..98d00131 100644 --- a/dsAiSupport/Doc/5、安装RocketMQ.md +++ b/dsAiSupport/Doc/5、安装RocketMQ.md @@ -82,3 +82,20 @@ sh bin/mqshutdown broker sh bin/mqshutdown namesrv ``` + +#### 主题维护 +```shell +cd /usr/local/rocketmq/bin + +# 创建主题 +./mqadmin updateTopic -n 10.10.14.210:9876 -c DefaultCluster -t HuangHaiTest +# 创建或重置一个名为 TOPIC_NAME 的 topic,并将该 broker 组的读写队列数皆设为 4 个。 +./mqadmin updateTopic -n 10.10.14.210:9876 -b 10.10.14.210:10911 -t HuangHaiTest -w 4 -r 4 + + +# 查看主题 +./mqadmin topicList -n 10.10.14.210:9876 -c + +# 删除主题 +./mqadmin deleteTopic -n 10.10.14.210:9876 -c DefaultCluster -t HuangHaiTest +``` \ No newline at end of file diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/TestRocketMQ.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/TestRocketMQ.java deleted file mode 100644 index cc321c6c..00000000 --- a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Test/TestRocketMQ.java +++ /dev/null @@ -1,142 +0,0 @@ -package com.dsideal.aiSupport.Test; - -import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.MessageQueueSelector; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.remoting.common.RemotingHelper; - -import java.io.IOException; -import java.util.List; - -//Java整合RocketMQ实现生产消费 -//https://blog.csdn.net/qq_28314431/article/details/128452607 - -public class TestRocketMQ { - private final static String nameServer = "10.10.14.210:9876"; - - private final static String producerGroup = "my_group"; - private final static String consumerGroup = "my_group"; - private final static String topic = "topic-test"; - - //同步发送 - public static void syncSend() { - try { - // 初始化一个producer并设置Producer group name - DefaultMQProducer producer = new DefaultMQProducer(producerGroup); - // 设置NameServer地址 - producer.setNamesrvAddr(nameServer); - // 启动producer - producer.start(); - // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤 - Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); - // 利用producer进行发送,并同步等待发送结果 - SendResult sendResult = producer.send(msg, 10000); - System.out.printf("%s%n", sendResult); - // 一旦producer不再使用,关闭producer - producer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - //异步发送 - public static void asyncSend() throws IOException { - try { - // 初始化一个producer并设置Producer group name - DefaultMQProducer producer = new DefaultMQProducer(producerGroup); - // 设置NameServer地址 - producer.setNamesrvAddr(nameServer); - // 启动producer - producer.start(); - // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤 - Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); - // 异步发送消息, 发送结果通过callback返回给客户端 - producer.send(msg, new SendCallback() { - public void onSuccess(SendResult sendResult) { - System.out.printf("OK %s %n", - sendResult.getMsgId()); - } - - public void onException(Throwable e) { - System.out.printf("Exception %s %n", e); - e.printStackTrace(); - } - }, 10000); - } catch (Exception e) { - e.printStackTrace(); - } - System.in.read(); - } - - //顺序消息 - public static void orderSend() { - try { - // 初始化一个producer并设置Producer group name - DefaultMQProducer producer = new DefaultMQProducer(producerGroup); - // 设置NameServer地址 - producer.setNamesrvAddr(nameServer); - producer.setSendMsgTimeout(10000); - // 启动producer - producer.start(); - String[] tags = new String[]{"TagA", "TagB", "TagC"}; - for (int i = 0; i < 10; i++) { - int orderId = i % 10; - Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.send(msg, new MessageQueueSelector() { - public MessageQueue select(List mqs, Message msg, Object arg) { - Integer id = (Integer) arg; - int index = id % mqs.size(); - return mqs.get(index); - } - }, orderId); - System.out.printf("%s%n", sendResult); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - - public static void consumerPull() { - try { - DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup); - consumer.setNamesrvAddr(nameServer); - //关闭自动提交 - consumer.setAutoCommit(false); - consumer.subscribe(topic, "*"); - consumer.setPullBatchSize(20); - consumer.start(); - while (true) { - List messageExts = consumer.poll(); - if (messageExts.size() > 0) { - String str = new String(messageExts.get(0).getBody()); - System.out.println(str); - } else { - System.out.println("本轮没有找到需要消费的消息!"); - } - consumer.commitSync(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - public static void main(String[] args) throws IOException { - //同步发送 - syncSend(); - - //异步发送 - //asyncSend(); - - //顺序消息 - orderSend(); - - //拉消费 - //consumerPull(); - } -} diff --git a/dsAiSupport/src/main/java/com/dsideal/aiSupport/Util/RocketMQUtil.java b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Util/RocketMQUtil.java new file mode 100644 index 00000000..78bc2e03 --- /dev/null +++ b/dsAiSupport/src/main/java/com/dsideal/aiSupport/Util/RocketMQUtil.java @@ -0,0 +1,180 @@ +package com.dsideal.aiSupport.Util; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.IOException; +import java.util.List; + +/** + * RocketMQ工具类,提供消息发送和消费的常用方法 + * 包含同步发送、异步发送、顺序消息发送和消息消费等功能 + */ +public class RocketMQUtil { + // RocketMQ服务器地址 + private final static String nameServer = "10.10.14.210:9876"; + // 生产者组名 + private final static String producerGroup = "my_group"; + // 消费者组名 + private final static String consumerGroup = "my_group"; + // 主题名称 + private final static String topic = "topic-test2"; + + /** + * 同步发送消息 + * 发送完消息后等待服务器响应结果 + */ + public static void syncSend() { + DefaultMQProducer producer = null; + try { + // 初始化生产者 + producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(nameServer); + producer.start(); + + // 创建消息对象 + Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); + // 发送消息并等待结果 + SendResult sendResult = producer.send(msg, 10000); + System.out.printf("同步发送结果:%s%n", sendResult); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // 确保producer被正确关闭 + if (producer != null) { + producer.shutdown(); + } + } + } + + /** + * 异步发送消息 + * 发送后立即返回,通过回调获取结果 + */ + public static void asyncSend() throws IOException, InterruptedException { + DefaultMQProducer producer = null; + try { + producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(nameServer); + producer.start(); + + Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); + // 异步发送,通过回调处理结果 + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("异步发送成功,消息ID:%s%n", sendResult.getMsgId()); + } + + @Override + public void onException(Throwable e) { + System.out.printf("异步发送异常:%s%n", e.getMessage()); + e.printStackTrace(); + } + }, 10000); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // 注意:异步发送时不能立即关闭producer,需要等待回调完成 + // 实际应用中应该有更好的生命周期管理 + Thread.sleep(1000); // 等待回调 + if (producer != null) { + producer.shutdown(); + } + } + } + + /** + * 发送顺序消息 + * 通过MessageQueueSelector确保同一个orderId的消息进入同一个队列 + */ + public static void orderSend() { + DefaultMQProducer producer = null; + try { + producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(nameServer); + producer.setSendMsgTimeout(10000); + producer.start(); + + String[] tags = new String[]{"TagA", "TagB", "TagC"}; + // 发送10条消息,相同orderId的消息会进入同一个队列 + for (int i = 0; i < 10; i++) { + int orderId = i % 10; + Message msg = new Message(topic, + tags[i % tags.length], + "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + + SendResult sendResult = producer.send(msg, + (List mqs, Message msg1, Object arg) -> { + Integer id = (Integer) arg; + int index = id % mqs.size(); + return mqs.get(index); + }, orderId); + System.out.printf("顺序消息发送结果:%s%n", sendResult); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (producer != null) { + producer.shutdown(); + } + } + } + + /** + * 消息消费者 + * 使用拉模式消费消息 + */ + public static void consumerPull() { + DefaultLitePullConsumer consumer = null; + try { + consumer = new DefaultLitePullConsumer(consumerGroup); + consumer.setNamesrvAddr(nameServer); + consumer.setAutoCommit(false); // 手动提交消费位点 + consumer.subscribe(topic, "*"); // 订阅所有Tag + consumer.setPullBatchSize(20); // 设置批量拉取大小 + consumer.start(); + + while (true) { + List messageExts = consumer.poll(); + if (!messageExts.isEmpty()) { + for (MessageExt msg : messageExts) { + String content = new String(msg.getBody()); + System.out.printf("消费消息:%s,Tag:%s%n", content, msg.getTags()); + } + } else { + System.out.println("本轮没有找到需要消费的消息!"); + } + consumer.commitSync(); // 提交消费位点 + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (consumer != null) { + consumer.shutdown(); + } + } + } + + + public static void main(String[] args) throws IOException { + // 测试同步发送 + syncSend(); + + // 测试顺序消息 + //orderSend(); + + // 注意:以下方法在实际使用时应该分开测试 + // asyncSend(); // 异步发送测试 + // consumerPull(); // 消费者测试 + + + } +}