main
黄海 2 months ago
parent 3a0f8ab68d
commit c8c4617fda

@ -82,3 +82,20 @@ sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
```
#### 主题维护
```shell
cd /usr/local/rocketmq/bin
# 创建主题
./mqadmin updateTopic -n 10.10.14.210:9876 -c DefaultCluster -t HuangHaiTest
# 创建或重置一个名为 TOPIC_NAME 的 topic并将该 broker 组的读写队列数皆设为 4 个。
./mqadmin updateTopic -n 10.10.14.210:9876 -b 10.10.14.210:10911 -t HuangHaiTest -w 4 -r 4
# 查看主题
./mqadmin topicList -n 10.10.14.210:9876 -c
# 删除主题
./mqadmin deleteTopic -n 10.10.14.210:9876 -c DefaultCluster -t HuangHaiTest
```

@ -1,142 +0,0 @@
package com.dsideal.aiSupport.Test;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.IOException;
import java.util.List;
//Java整合RocketMQ实现生产消费
//https://blog.csdn.net/qq_28314431/article/details/128452607
public class TestRocketMQ {
private final static String nameServer = "10.10.14.210:9876";
private final static String producerGroup = "my_group";
private final static String consumerGroup = "my_group";
private final static String topic = "topic-test";
//同步发送
public static void syncSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer进行发送并同步等待发送结果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
//异步发送
public static void asyncSend() throws IOException {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("OK %s %n",
sendResult.getMsgId());
}
public void onException(Throwable e) {
System.out.printf("Exception %s %n", e);
e.printStackTrace();
}
}, 10000);
} catch (Exception e) {
e.printStackTrace();
}
System.in.read();
}
//顺序消息
public static void orderSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
// 启动producer
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void consumerPull() {
try {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
//关闭自动提交
consumer.setAutoCommit(false);
consumer.subscribe(topic, "*");
consumer.setPullBatchSize(20);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (messageExts.size() > 0) {
String str = new String(messageExts.get(0).getBody());
System.out.println(str);
} else {
System.out.println("本轮没有找到需要消费的消息!");
}
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
//同步发送
syncSend();
//异步发送
//asyncSend();
//顺序消息
orderSend();
//拉消费
//consumerPull();
}
}

@ -0,0 +1,180 @@
package com.dsideal.aiSupport.Util;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.IOException;
import java.util.List;
/**
* RocketMQ
*
*/
public class RocketMQUtil {
// RocketMQ服务器地址
private final static String nameServer = "10.10.14.210:9876";
// 生产者组名
private final static String producerGroup = "my_group";
// 消费者组名
private final static String consumerGroup = "my_group";
// 主题名称
private final static String topic = "topic-test2";
/**
*
*
*/
public static void syncSend() {
DefaultMQProducer producer = null;
try {
// 初始化生产者
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.start();
// 创建消息对象
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并等待结果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("同步发送结果:%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 确保producer被正确关闭
if (producer != null) {
producer.shutdown();
}
}
}
/**
*
*
*/
public static void asyncSend() throws IOException, InterruptedException {
DefaultMQProducer producer = null;
try {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.start();
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送,通过回调处理结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功消息ID%s%n", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("异步发送异常:%s%n", e.getMessage());
e.printStackTrace();
}
}, 10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 注意异步发送时不能立即关闭producer需要等待回调完成
// 实际应用中应该有更好的生命周期管理
Thread.sleep(1000); // 等待回调
if (producer != null) {
producer.shutdown();
}
}
}
/**
*
* MessageQueueSelectororderId
*/
public static void orderSend() {
DefaultMQProducer producer = null;
try {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
// 发送10条消息相同orderId的消息会进入同一个队列
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg = new Message(topic,
tags[i % tags.length],
"KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg,
(List<MessageQueue> mqs, Message msg1, Object arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
System.out.printf("顺序消息发送结果:%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.shutdown();
}
}
}
/**
*
* 使
*/
public static void consumerPull() {
DefaultLitePullConsumer consumer = null;
try {
consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.setAutoCommit(false); // 手动提交消费位点
consumer.subscribe(topic, "*"); // 订阅所有Tag
consumer.setPullBatchSize(20); // 设置批量拉取大小
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (!messageExts.isEmpty()) {
for (MessageExt msg : messageExts) {
String content = new String(msg.getBody());
System.out.printf("消费消息:%sTag%s%n", content, msg.getTags());
}
} else {
System.out.println("本轮没有找到需要消费的消息!");
}
consumer.commitSync(); // 提交消费位点
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (consumer != null) {
consumer.shutdown();
}
}
}
public static void main(String[] args) throws IOException {
// 测试同步发送
syncSend();
// 测试顺序消息
//orderSend();
// 注意:以下方法在实际使用时应该分开测试
// asyncSend(); // 异步发送测试
// consumerPull(); // 消费者测试
}
}
Loading…
Cancel
Save