|
|
@ -113,7 +113,7 @@ public class RocketMqKit {
|
|
|
|
SendResult sendResult = producer.send(msg, 10000);
|
|
|
|
SendResult sendResult = producer.send(msg, 10000);
|
|
|
|
System.out.printf("同步发送结果:%s%n", sendResult);
|
|
|
|
System.out.printf("同步发送结果:%s%n", sendResult);
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
} finally {
|
|
|
|
} finally {
|
|
|
|
// 确保producer被正确关闭
|
|
|
|
// 确保producer被正确关闭
|
|
|
|
if (producer != null) {
|
|
|
|
if (producer != null) {
|
|
|
@ -144,11 +144,11 @@ public class RocketMqKit {
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void onException(Throwable e) {
|
|
|
|
public void onException(Throwable e) {
|
|
|
|
System.out.printf("异步发送异常:%s%n", e.getMessage());
|
|
|
|
System.out.printf("异步发送异常:%s%n", e.getMessage());
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}, 10000);
|
|
|
|
}, 10000);
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
} finally {
|
|
|
|
} finally {
|
|
|
|
// 注意:异步发送时不能立即关闭producer,需要等待回调完成
|
|
|
|
// 注意:异步发送时不能立即关闭producer,需要等待回调完成
|
|
|
|
// 实际应用中应该有更好的生命周期管理
|
|
|
|
// 实际应用中应该有更好的生命周期管理
|
|
|
@ -175,21 +175,21 @@ public class RocketMqKit {
|
|
|
|
// 发送10条消息,相同orderId的消息会进入同一个队列
|
|
|
|
// 发送10条消息,相同orderId的消息会进入同一个队列
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
int orderId = i % 10;
|
|
|
|
int orderId = i % 10;
|
|
|
|
Message msg = new Message(topic,
|
|
|
|
Message msg = new Message(topic,
|
|
|
|
tags[i % tags.length],
|
|
|
|
tags[i % tags.length],
|
|
|
|
"KEY" + i,
|
|
|
|
"KEY" + i,
|
|
|
|
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
|
|
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
|
|
|
|
|
|
|
|
|
|
SendResult sendResult = producer.send(msg,
|
|
|
|
SendResult sendResult = producer.send(msg,
|
|
|
|
(List<MessageQueue> mqs, Message msg1, Object arg) -> {
|
|
|
|
(List<MessageQueue> mqs, Message msg1, Object arg) -> {
|
|
|
|
Integer id = (Integer) arg;
|
|
|
|
Integer id = (Integer) arg;
|
|
|
|
int index = id % mqs.size();
|
|
|
|
int index = id % mqs.size();
|
|
|
|
return mqs.get(index);
|
|
|
|
return mqs.get(index);
|
|
|
|
}, orderId);
|
|
|
|
}, orderId);
|
|
|
|
System.out.printf("顺序消息发送结果:%s%n", sendResult);
|
|
|
|
System.out.printf("顺序消息发送结果:%s%n", sendResult);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
} finally {
|
|
|
|
} finally {
|
|
|
|
if (producer != null) {
|
|
|
|
if (producer != null) {
|
|
|
|
producer.shutdown();
|
|
|
|
producer.shutdown();
|
|
|
@ -221,10 +221,10 @@ public class RocketMqKit {
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
System.out.println("本轮没有找到需要消费的消息!");
|
|
|
|
System.out.println("本轮没有找到需要消费的消息!");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
consumer.commitSync(); // 提交消费位点
|
|
|
|
consumer.commit();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
} finally {
|
|
|
|
} finally {
|
|
|
|
if (consumer != null) {
|
|
|
|
if (consumer != null) {
|
|
|
|
consumer.shutdown();
|
|
|
|
consumer.shutdown();
|
|
|
@ -234,7 +234,7 @@ public class RocketMqKit {
|
|
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
public static void main(String[] args) {
|
|
|
|
// 模拟发送
|
|
|
|
// 模拟发送
|
|
|
|
logger.info("当前服务器:"+nameServer);
|
|
|
|
logger.info("当前服务器:" + nameServer);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
@ -260,7 +260,7 @@ public class RocketMqKit {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
consumerPull();
|
|
|
|
consumerPull();
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
thread.start();
|
|
|
|
thread.start();
|
|
|
@ -268,7 +268,7 @@ public class RocketMqKit {
|
|
|
|
shutdown(); // 关闭资源
|
|
|
|
shutdown(); // 关闭资源
|
|
|
|
System.exit(0);
|
|
|
|
System.exit(0);
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error(e.getMessage());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|