|
|
|
@ -0,0 +1,142 @@
|
|
|
|
|
package com.charge.tools;
|
|
|
|
|
|
|
|
|
|
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
|
|
|
|
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
|
|
|
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
|
|
|
|
import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
|
|
|
import org.apache.rocketmq.common.message.Message;
|
|
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
|
|
|
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
|
|
//Java整合RocketMQ实现生产消费
|
|
|
|
|
//https://blog.csdn.net/qq_28314431/article/details/128452607
|
|
|
|
|
|
|
|
|
|
public class TestRocketMQ {
|
|
|
|
|
private final static String nameServer = "10.10.14.210:9876";
|
|
|
|
|
|
|
|
|
|
private final static String producerGroup = "my_group";
|
|
|
|
|
private final static String consumerGroup = "my_group";
|
|
|
|
|
private final static String topic = "topic-test";
|
|
|
|
|
|
|
|
|
|
//同步发送
|
|
|
|
|
public static void syncSend() {
|
|
|
|
|
try {
|
|
|
|
|
// 初始化一个producer并设置Producer group name
|
|
|
|
|
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
|
|
|
|
|
// 设置NameServer地址
|
|
|
|
|
producer.setNamesrvAddr(nameServer);
|
|
|
|
|
// 启动producer
|
|
|
|
|
producer.start();
|
|
|
|
|
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
|
|
|
|
|
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
|
|
|
// 利用producer进行发送,并同步等待发送结果
|
|
|
|
|
SendResult sendResult = producer.send(msg, 10000);
|
|
|
|
|
System.out.printf("%s%n", sendResult);
|
|
|
|
|
// 一旦producer不再使用,关闭producer
|
|
|
|
|
producer.shutdown();
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//异步发送
|
|
|
|
|
public static void asyncSend() throws IOException {
|
|
|
|
|
try {
|
|
|
|
|
// 初始化一个producer并设置Producer group name
|
|
|
|
|
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
|
|
|
|
|
// 设置NameServer地址
|
|
|
|
|
producer.setNamesrvAddr(nameServer);
|
|
|
|
|
// 启动producer
|
|
|
|
|
producer.start();
|
|
|
|
|
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
|
|
|
|
|
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
|
|
|
// 异步发送消息, 发送结果通过callback返回给客户端
|
|
|
|
|
producer.send(msg, new SendCallback() {
|
|
|
|
|
public void onSuccess(SendResult sendResult) {
|
|
|
|
|
System.out.printf("OK %s %n",
|
|
|
|
|
sendResult.getMsgId());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void onException(Throwable e) {
|
|
|
|
|
System.out.printf("Exception %s %n", e);
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}, 10000);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
System.in.read();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//顺序消息
|
|
|
|
|
public static void orderSend() {
|
|
|
|
|
try {
|
|
|
|
|
// 初始化一个producer并设置Producer group name
|
|
|
|
|
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
|
|
|
|
|
// 设置NameServer地址
|
|
|
|
|
producer.setNamesrvAddr(nameServer);
|
|
|
|
|
producer.setSendMsgTimeout(10000);
|
|
|
|
|
// 启动producer
|
|
|
|
|
producer.start();
|
|
|
|
|
String[] tags = new String[]{"TagA", "TagB", "TagC"};
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
|
int orderId = i % 10;
|
|
|
|
|
Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
|
|
|
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
|
|
|
|
|
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
|
|
|
|
Integer id = (Integer) arg;
|
|
|
|
|
int index = id % mqs.size();
|
|
|
|
|
return mqs.get(index);
|
|
|
|
|
}
|
|
|
|
|
}, orderId);
|
|
|
|
|
System.out.printf("%s%n", sendResult);
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void consumerPull() {
|
|
|
|
|
try {
|
|
|
|
|
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
|
|
|
|
|
consumer.setNamesrvAddr(nameServer);
|
|
|
|
|
//关闭自动提交
|
|
|
|
|
consumer.setAutoCommit(false);
|
|
|
|
|
consumer.subscribe(topic, "*");
|
|
|
|
|
consumer.setPullBatchSize(20);
|
|
|
|
|
consumer.start();
|
|
|
|
|
while (true) {
|
|
|
|
|
List<MessageExt> messageExts = consumer.poll();
|
|
|
|
|
if (messageExts.size() > 0) {
|
|
|
|
|
String str = new String(messageExts.get(0).getBody());
|
|
|
|
|
System.out.println(str);
|
|
|
|
|
} else {
|
|
|
|
|
System.out.println("本轮没有找到需要消费的消息!");
|
|
|
|
|
}
|
|
|
|
|
consumer.commitSync();
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) throws IOException {
|
|
|
|
|
//同步发送
|
|
|
|
|
syncSend();
|
|
|
|
|
|
|
|
|
|
//异步发送
|
|
|
|
|
//asyncSend();
|
|
|
|
|
|
|
|
|
|
//顺序消息
|
|
|
|
|
//orderSend();
|
|
|
|
|
|
|
|
|
|
//拉消费
|
|
|
|
|
consumerPull();
|
|
|
|
|
}
|
|
|
|
|
}
|